1
0
mirror of https://github.com/chubin/cheat.sh.git synced 2026-06-20 05:06:44 +02:00

Merge pull request #420 from chubin/ic.black

Format python code using black
This commit is contained in:
Igor Chubin
2025-07-29 20:12:44 +02:00
committed by GitHub
41 changed files with 1605 additions and 1110 deletions
+87 -57
View File
@@ -17,9 +17,10 @@ Configuration parameters:
from __future__ import print_function
import sys
if sys.version_info[0] < 3:
reload(sys)
sys.setdefaultencoding('utf8')
sys.setdefaultencoding("utf8")
import sys
import logging
@@ -43,7 +44,8 @@ if not os.path.exists(os.path.dirname(CONFIG["path.log.main"])):
logging.basicConfig(
filename=CONFIG["path.log.main"],
level=logging.DEBUG,
format='%(asctime)s %(message)s')
format="%(asctime)s %(message)s",
)
# Fix Flask "exception and request logging" to `stderr`.
#
# When Flask's werkzeug detects that logging is already set, it
@@ -52,7 +54,9 @@ stderr_handler = logging.StreamHandler()
logging.getLogger().addHandler(stderr_handler)
#
# Alter log format to disting log lines from everything else
stderr_handler.setFormatter(logging.Formatter('%(filename)s:%(lineno)s: %(message)s'))
stderr_handler.setFormatter(logging.Formatter("%(filename)s:%(lineno)s: %(message)s"))
#
# Sometimes werkzeug starts logging before an app is imported
# (https://github.com/pallets/werkzeug/issues/1969)
@@ -60,16 +64,18 @@ stderr_handler.setFormatter(logging.Formatter('%(filename)s:%(lineno)s: %(messag
# stderr handler to skip lines from werkzeug.
class SkipFlaskLogger(object):
def filter(self, record):
if record.name != 'werkzeug':
if record.name != "werkzeug":
return True
if logging.getLogger('werkzeug').handlers:
if logging.getLogger("werkzeug").handlers:
stderr_handler.addFilter(SkipFlaskLogger())
app = Flask(__name__) # pylint: disable=invalid-name
app.jinja_loader = jinja2.ChoiceLoader([
app.jinja_loader,
jinja2.FileSystemLoader(CONFIG["path.internal.templates"])])
app = Flask(__name__) # pylint: disable=invalid-name
app.jinja_loader = jinja2.ChoiceLoader(
[app.jinja_loader, jinja2.FileSystemLoader(CONFIG["path.internal.templates"])]
)
LIMITS = Limits()
@@ -85,16 +91,19 @@ PLAIN_TEXT_AGENTS = [
"aiohttp",
]
def _is_html_needed(user_agent):
"""
Basing on `user_agent`, return whether it needs HTML or ANSI
"""
return all([x not in user_agent for x in PLAIN_TEXT_AGENTS])
def is_result_a_script(query):
return query in [':cht.sh']
@app.route('/files/<path:path>')
def is_result_a_script(query):
return query in [":cht.sh"]
@app.route("/files/<path:path>")
def send_static(path):
"""
Return static file `path`.
@@ -102,15 +111,17 @@ def send_static(path):
"""
return send_from_directory(CONFIG["path.internal.static"], path)
@app.route('/favicon.ico')
@app.route("/favicon.ico")
def send_favicon():
"""
Return static file `favicon.ico`.
Can be served by the HTTP frontend.
"""
return send_from_directory(CONFIG["path.internal.static"], 'favicon.ico')
return send_from_directory(CONFIG["path.internal.static"], "favicon.ico")
@app.route('/malformed-response.html')
@app.route("/malformed-response.html")
def send_malformed():
"""
Return static file `malformed-response.html`.
@@ -119,13 +130,15 @@ def send_malformed():
dirname, filename = os.path.split(CONFIG["path.internal.malformed"])
return send_from_directory(dirname, filename)
def log_query(ip_addr, found, topic, user_agent):
"""
Log processed query and some internal data
"""
log_entry = "%s %s %s %s\n" % (ip_addr, found, topic, user_agent)
with open(CONFIG["path.log.queries"], 'ab') as my_file:
my_file.write(log_entry.encode('utf-8'))
with open(CONFIG["path.log.queries"], "ab") as my_file:
my_file.write(log_entry.encode("utf-8"))
def get_request_ip(req):
"""
@@ -134,19 +147,20 @@ def get_request_ip(req):
if req.headers.getlist("X-Forwarded-For"):
ip_addr = req.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'):
if ip_addr.startswith("::ffff:"):
ip_addr = ip_addr[7:]
else:
ip_addr = req.remote_addr
if req.headers.getlist("X-Forwarded-For"):
ip_addr = req.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'):
if ip_addr.startswith("::ffff:"):
ip_addr = ip_addr[7:]
else:
ip_addr = req.remote_addr
return ip_addr
def get_answer_language(request):
"""
Return preferred answer language based on
@@ -174,26 +188,26 @@ def get_answer_language(request):
def _find_supported_language(accepted_languages):
for lang_tuple in accepted_languages:
lang = lang_tuple[0]
if '-' in lang:
lang = lang.split('-', 1)[0]
if "-" in lang:
lang = lang.split("-", 1)[0]
return lang
return None
lang = None
hostname = request.headers['Host']
if hostname.endswith('.cheat.sh'):
hostname = request.headers["Host"]
if hostname.endswith(".cheat.sh"):
lang = hostname[:-9]
if 'lang' in request.args:
lang = request.args.get('lang')
if "lang" in request.args:
lang = request.args.get("lang")
header_accept_language = request.headers.get('Accept-Language', '')
header_accept_language = request.headers.get("Accept-Language", "")
if lang is None and header_accept_language:
lang = _find_supported_language(
_parse_accept_language(header_accept_language))
lang = _find_supported_language(_parse_accept_language(header_accept_language))
return lang
def _proxy(*args, **kwargs):
# print "method=", request.method,
# print "url=", request.url.replace('/:shell-x/', ':3000/')
@@ -202,11 +216,11 @@ def _proxy(*args, **kwargs):
# print "cookies=", request.cookies
# print "allow_redirects=", False
url_before, url_after = request.url.split('/:shell-x/', 1)
url = url_before + ':3000/'
url_before, url_after = request.url.split("/:shell-x/", 1)
url = url_before + ":3000/"
if 'q' in request.args:
url_after = '?' + "&".join("arg=%s" % x for x in request.args['q'].split())
if "q" in request.args:
url_after = "?" + "&".join("arg=%s" % x for x in request.args["q"].split())
url += url_after
print(url)
@@ -214,20 +228,29 @@ def _proxy(*args, **kwargs):
resp = requests.request(
method=request.method,
url=url,
headers={key: value for (key, value) in request.headers if key != 'Host'},
headers={key: value for (key, value) in request.headers if key != "Host"},
data=request.get_data(),
cookies=request.cookies,
allow_redirects=False)
allow_redirects=False,
)
excluded_headers = ['content-encoding', 'content-length', 'transfer-encoding', 'connection']
headers = [(name, value) for (name, value) in resp.raw.headers.items()
if name.lower() not in excluded_headers]
excluded_headers = [
"content-encoding",
"content-length",
"transfer-encoding",
"connection",
]
headers = [
(name, value)
for (name, value) in resp.raw.headers.items()
if name.lower() not in excluded_headers
]
response = Response(resp.content, resp.status_code, headers)
return response
@app.route("/", methods=['GET', 'POST'])
@app.route("/", methods=["GET", "POST"])
@app.route("/<path:topic>", methods=["GET", "POST"])
def answer(topic=None):
"""
@@ -242,16 +265,19 @@ def answer(topic=None):
request.query_string
"""
user_agent = request.headers.get('User-Agent', '').lower()
user_agent = request.headers.get("User-Agent", "").lower()
html_needed = _is_html_needed(user_agent)
options = parse_args(request.args)
if topic in ['apple-touch-icon-precomposed.png', 'apple-touch-icon.png', 'apple-touch-icon-120x120-precomposed.png'] \
or (topic is not None and any(topic.endswith('/'+x) for x in ['favicon.ico'])):
return ''
if topic in [
"apple-touch-icon-precomposed.png",
"apple-touch-icon.png",
"apple-touch-icon-120x120-precomposed.png",
] or (topic is not None and any(topic.endswith("/" + x) for x in ["favicon.ico"])):
return ""
request_id = request.cookies.get('id')
if topic is not None and topic.lstrip('/') == ':last':
request_id = request.cookies.get("id")
if topic is not None and topic.lstrip("/") == ":last":
if request_id:
topic = last_query(request_id)
else:
@@ -260,43 +286,47 @@ def answer(topic=None):
if request_id:
save_query(request_id, topic)
if request.method == 'POST':
if request.method == "POST":
process_post_request(request, html_needed)
if html_needed:
return redirect("/")
return "OK\n"
if 'topic' in request.args:
return redirect("/%s" % request.args.get('topic'))
if "topic" in request.args:
return redirect("/%s" % request.args.get("topic"))
if topic is None:
topic = ":firstpage"
if topic.startswith(':shell-x/'):
if topic.startswith(":shell-x/"):
return _proxy()
#return requests.get('http://127.0.0.1:3000'+topic[8:]).text
# return requests.get('http://127.0.0.1:3000'+topic[8:]).text
lang = get_answer_language(request)
if lang:
options['lang'] = lang
options["lang"] = lang
ip_address = get_request_ip(request)
if '+' in topic:
if "+" in topic:
not_allowed = LIMITS.check_ip(ip_address)
if not_allowed:
return "429 %s\n" % not_allowed, 429
html_is_needed = _is_html_needed(user_agent) and not is_result_a_script(topic)
if html_is_needed:
output_format='html'
output_format = "html"
else:
output_format='ansi'
result, found = cheat_wrapper(topic, request_options=options, output_format=output_format)
if 'Please come back in several hours' in result and html_is_needed:
malformed_response = open(os.path.join(CONFIG["path.internal.malformed"])).read()
output_format = "ansi"
result, found = cheat_wrapper(
topic, request_options=options, output_format=output_format
)
if "Please come back in several hours" in result and html_is_needed:
malformed_response = open(
os.path.join(CONFIG["path.internal.malformed"])
).read()
return malformed_response
log_query(ip_address, found, topic, user_agent)
if html_is_needed:
return result
return Response(result, mimetype='text/plain')
return Response(result, mimetype="text/plain")
+2 -2
View File
@@ -1,7 +1,7 @@
import sys
import redis
REDIS = redis.Redis(host='localhost', port=6379, db=0)
REDIS = redis.Redis(host="localhost", port=6379, db=0)
for key in sys.argv[1:]:
REDIS.delete(key)
+6 -5
View File
@@ -5,6 +5,7 @@
from gevent.monkey import patch_all
from gevent.pywsgi import WSGIServer
patch_all()
import os
@@ -13,16 +14,16 @@ import sys
from app import app, CONFIG
if '--debug' in sys.argv:
if "--debug" in sys.argv:
# Not all debug mode features are available under `gevent`
# https://github.com/pallets/flask/issues/3825
app.debug = True
if 'CHEATSH_PORT' in os.environ:
port = int(os.environ.get('CHEATSH_PORT'))
if "CHEATSH_PORT" in os.environ:
port = int(os.environ.get("CHEATSH_PORT"))
else:
port = CONFIG['server.port']
port = CONFIG["server.port"]
srv = WSGIServer((CONFIG['server.bind'], port), app)
srv = WSGIServer((CONFIG["server.bind"], port), app)
print("Starting gevent server on {}:{}".format(srv.address[0], srv.address[1]))
srv.serve_forever()
+2 -1
View File
@@ -12,7 +12,8 @@ import glob
__all__ = [
basename(f)[:-3]
for f in glob.glob(join(dirname(__file__), "*.py"))
if isfile(f) and not f.endswith('__init__.py')]
if isfile(f) and not f.endswith("__init__.py")
]
from .adapter import all_adapters
from . import *
+39 -30
View File
@@ -11,16 +11,19 @@ import os
from six import with_metaclass
from config import CONFIG
class AdapterMC(type):
"""
Adapter Metaclass.
Defines string representation of adapters
"""
def __repr__(cls):
if hasattr(cls, '_class_repr'):
return getattr(cls, '_class_repr')()
if hasattr(cls, "_class_repr"):
return getattr(cls, "_class_repr")()
return super(AdapterMC, cls).__repr__()
class Adapter(with_metaclass(AdapterMC, object)):
"""
An abstract class, defines methods:
@@ -39,7 +42,7 @@ class Adapter(with_metaclass(AdapterMC, object)):
"""
_adapter_name = None
_output_format = 'code'
_output_format = "code"
_cache_needed = False
_repository_url = None
_local_repository_location = None
@@ -49,7 +52,7 @@ class Adapter(with_metaclass(AdapterMC, object)):
@classmethod
def _class_repr(cls):
return '[Adapter: %s (%s)]' % (cls._adapter_name, cls.__name__)
return "[Adapter: %s (%s)]" % (cls._adapter_name, cls.__name__)
def __init__(self):
self._list = {None: self._get_list()}
@@ -108,13 +111,13 @@ class Adapter(with_metaclass(AdapterMC, object)):
pass
def _get_output_format(self, topic):
if '/' in topic:
subquery = topic.split('/')[-1]
if "/" in topic:
subquery = topic.split("/")[-1]
else:
subquery = topic
if subquery in [':list']:
return 'text'
if subquery in [":list"]:
return "text"
return self._output_format
# pylint: disable=unused-argument
@@ -142,11 +145,11 @@ class Adapter(with_metaclass(AdapterMC, object)):
answer = {"answer": answer}
answer_dict = {
'topic': topic,
'topic_type': self._adapter_name,
'format': self._get_output_format(topic),
'cache': self._cache_needed,
}
"topic": topic,
"topic_type": self._adapter_name,
"format": self._get_output_format(topic),
"cache": self._cache_needed,
}
answer_dict.update(answer)
# pylint: disable=assignment-from-none
@@ -176,9 +179,9 @@ class Adapter(with_metaclass(AdapterMC, object)):
if not dirname and cls._repository_url:
dirname = cls._repository_url
if dirname.startswith('https://'):
if dirname.startswith("https://"):
dirname = dirname[8:]
elif dirname.startswith('http://'):
elif dirname.startswith("http://"):
dirname = dirname[7:]
# if we did not manage to find out dirname up to this point,
@@ -187,7 +190,7 @@ class Adapter(with_metaclass(AdapterMC, object)):
if not dirname:
return None
if dirname.startswith('/'):
if dirname.startswith("/"):
return dirname
# it is possible that several repositories will
@@ -195,10 +198,10 @@ class Adapter(with_metaclass(AdapterMC, object)):
# (because only the last part of the path is used)
# in this case provide the name in _local_repository_location
# (detected by fetch.py)
if '/' in dirname:
dirname = dirname.split('/')[-1]
if "/" in dirname:
dirname = dirname.split("/")[-1]
path = os.path.join(CONFIG['path.repositories'], dirname)
path = os.path.join(CONFIG["path.repositories"], dirname)
if cheat_sheets_location:
path = os.path.join(path, cls._cheatsheet_files_prefix)
@@ -225,7 +228,8 @@ class Adapter(with_metaclass(AdapterMC, object)):
# in this case `fetch` has to be implemented
# in the distinct adapter subclass
raise RuntimeError(
"Do not known how to handle this repository: %s" % cls._repository_url)
"Do not known how to handle this repository: %s" % cls._repository_url
)
@classmethod
def update_command(cls):
@@ -245,7 +249,8 @@ class Adapter(with_metaclass(AdapterMC, object)):
# in this case `update` has to be implemented
# in the distinct adapter subclass
raise RuntimeError(
"Do not known how to handle this repository: %s" % cls._repository_url)
"Do not known how to handle this repository: %s" % cls._repository_url
)
@classmethod
def current_state_command(cls):
@@ -264,7 +269,8 @@ class Adapter(with_metaclass(AdapterMC, object)):
# in this case `update` has to be implemented
# in the distinct adapter subclass
raise RuntimeError(
"Do not known how to handle this repository: %s" % cls._repository_url)
"Do not known how to handle this repository: %s" % cls._repository_url
)
@classmethod
def save_state(cls, state):
@@ -273,8 +279,8 @@ class Adapter(with_metaclass(AdapterMC, object)):
Must be called after the cache clean up.
"""
local_repository_dir = cls.local_repository_location()
state_filename = os.path.join(local_repository_dir, '.cached_revision')
open(state_filename, 'w').write(state)
state_filename = os.path.join(local_repository_dir, ".cached_revision")
open(state_filename, "w").write(state)
@classmethod
def get_state(cls):
@@ -284,10 +290,10 @@ class Adapter(with_metaclass(AdapterMC, object)):
"""
local_repository_dir = cls.local_repository_location()
state_filename = os.path.join(local_repository_dir, '.cached_revision')
state_filename = os.path.join(local_repository_dir, ".cached_revision")
state = None
if os.path.exists(state_filename):
state = open(state_filename, 'r').read()
state = open(state_filename, "r").read()
return state
@classmethod
@@ -317,20 +323,23 @@ class Adapter(with_metaclass(AdapterMC, object)):
answer.append(entry)
return answer
def all_adapters(as_dict=False):
"""
Return list of all known adapters
If `as_dict` is True, return dict {'name': adapter} instead of a list.
"""
def _all_subclasses(cls):
return set(cls.__subclasses__()).union(set(
[s for c in cls.__subclasses__() for s in _all_subclasses(c)]
))
return set(cls.__subclasses__()).union(
set([s for c in cls.__subclasses__() for s in _all_subclasses(c)])
)
if as_dict:
return {x.name():x for x in _all_subclasses(Adapter)}
return {x.name(): x for x in _all_subclasses(Adapter)}
return list(_all_subclasses(Adapter))
def adapter_by_name(name):
"""
Return adapter having this name,
+1
View File
@@ -9,6 +9,7 @@ Each cheat sheet is a separate file without extension
from .git_adapter import GitRepositoryAdapter
class Cheat(GitRepositoryAdapter):
"""
cheat/cheat adapter
+32 -28
View File
@@ -11,24 +11,26 @@ import glob
from .git_adapter import GitRepositoryAdapter
def _remove_initial_underscore(filename):
if filename.startswith('_'):
if filename.startswith("_"):
filename = filename[1:]
return filename
def _sanitize_dirnames(filename, restore=False):
"""
Remove (or add) leading _ in the directories names in `filename`
The `restore` param means that the path name should be restored from the queryname,
i.e. conversion should be done in the opposite direction
"""
parts = filename.split('/')
parts = filename.split("/")
newparts = []
for part in parts[:-1]:
if restore:
newparts.append('_'+part)
newparts.append("_" + part)
continue
if part.startswith('_'):
if part.startswith("_"):
newparts.append(part[1:])
else:
newparts.append(part)
@@ -36,8 +38,8 @@ def _sanitize_dirnames(filename, restore=False):
return "/".join(newparts)
class CheatSheets(GitRepositoryAdapter):
class CheatSheets(GitRepositoryAdapter):
"""
Adapter for the cheat.sheets cheat sheets.
"""
@@ -56,18 +58,17 @@ class CheatSheets(GitRepositoryAdapter):
hidden_files = ["_info.yaml"]
answer = []
prefix = os.path.join(
self.local_repository_location(),
self._cheatsheet_files_prefix)
for mask in ['*', '*/*']:
template = os.path.join(
prefix,
mask)
self.local_repository_location(), self._cheatsheet_files_prefix
)
for mask in ["*", "*/*"]:
template = os.path.join(prefix, mask)
answer += [
_sanitize_dirnames(f_name[len(prefix):])
_sanitize_dirnames(f_name[len(prefix) :])
for f_name in glob.glob(template)
if not os.path.isdir(f_name)
and os.path.basename(f_name) not in hidden_files]
and os.path.basename(f_name) not in hidden_files
]
return sorted(answer)
@@ -76,18 +77,19 @@ class CheatSheets(GitRepositoryAdapter):
filename = os.path.join(
self.local_repository_location(),
self._cheatsheet_files_prefix,
_sanitize_dirnames(topic, restore=True))
_sanitize_dirnames(topic, restore=True),
)
if os.path.exists(filename):
answer = self._format_page(open(filename, 'r').read())
answer = self._format_page(open(filename, "r").read())
else:
# though it should not happen
answer = "%s:%s not found" % (str(self.__class__), topic)
return answer
class CheatSheetsDir(CheatSheets):
class CheatSheetsDir(CheatSheets):
"""
Adapter for the cheat sheets directories.
Provides pages named according to subdirectories:
@@ -103,14 +105,16 @@ class CheatSheetsDir(CheatSheets):
def _get_list(self, prefix=None):
template = os.path.join(
self.local_repository_location(),
self._cheatsheet_files_prefix,
'*')
self.local_repository_location(), self._cheatsheet_files_prefix, "*"
)
answer = sorted([
_remove_initial_underscore(os.path.basename(f_name)) + "/"
for f_name in glob.glob(template)
if os.path.isdir(f_name)])
answer = sorted(
[
_remove_initial_underscore(os.path.basename(f_name)) + "/"
for f_name in glob.glob(template)
if os.path.isdir(f_name)
]
)
return answer
@@ -122,12 +126,12 @@ class CheatSheetsDir(CheatSheets):
template = os.path.join(
self.local_repository_location(),
self._cheatsheet_files_prefix,
topic.rstrip('/'),
'*')
topic.rstrip("/"),
"*",
)
answer = sorted([
os.path.basename(f_name) for f_name in glob.glob(template)])
answer = sorted([os.path.basename(f_name) for f_name in glob.glob(template)])
return "\n".join(answer) + "\n"
def is_found(self, topic):
return CheatSheets.is_found(self, topic.rstrip('/'))
return CheatSheets.is_found(self, topic.rstrip("/"))
+27 -21
View File
@@ -1,5 +1,4 @@
"""
"""
""" """
# pylint: disable=unused-argument,abstract-method
@@ -19,13 +18,12 @@ def _get_abspath(path):
return path
import __main__
return os.path.join(
os.path.dirname(os.path.dirname(__main__.__file__)),
path)
return os.path.join(os.path.dirname(os.path.dirname(__main__.__file__)), path)
class CommandAdapter(Adapter):
"""
"""
""" """
_command = []
@@ -37,14 +35,17 @@ class CommandAdapter(Adapter):
if cmd:
try:
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
answer = proc.communicate()[0].decode('utf-8', 'ignore')
answer = proc.communicate()[0].decode("utf-8", "ignore")
except OSError:
return "ERROR of the \"%s\" adapter: please create an issue" % self._adapter_name
return (
'ERROR of the "%s" adapter: please create an issue'
% self._adapter_name
)
return answer
return ""
class Fosdem(CommandAdapter):
class Fosdem(CommandAdapter):
"""
Show the output of the `current-fosdem-slide` command,
which shows the current slide open in some terminal.
@@ -66,22 +67,26 @@ class Fosdem(CommandAdapter):
_pages_list = [":fosdem"]
_command = ["sudo", "/usr/local/bin/current-fosdem-slide"]
class Translation(CommandAdapter):
"""
"""
""" """
_adapter_name = "translation"
_output_format = "text"
_cache_needed = True
def _get_page(self, topic, request_options=None):
from_, topic = topic.split('/', 1)
to_ = request_options.get('lang', 'en')
if '-' in from_:
from_, to_ = from_.split('-', 1)
from_, topic = topic.split("/", 1)
to_ = request_options.get("lang", "en")
if "-" in from_:
from_, to_ = from_.split("-", 1)
return ["/home/igor/cheat.sh/bin/get_translation",
from_, to_, topic.replace('+', ' ')]
return [
"/home/igor/cheat.sh/bin/get_translation",
from_,
to_,
topic.replace("+", " "),
]
class AdapterRfc(CommandAdapter):
@@ -112,6 +117,7 @@ class AdapterRfc(CommandAdapter):
def is_found(self, topic):
return True
class AdapterOeis(CommandAdapter):
"""
Show OEIS by its number.
@@ -145,13 +151,14 @@ class AdapterOeis(CommandAdapter):
suffix = " :list"
topic = topic[:-6]
topic = re.sub('[^a-zA-Z0-9-:]+', ' ', topic) + suffix
topic = re.sub("[^a-zA-Z0-9-:]+", " ", topic) + suffix
return cmd + [topic]
def is_found(self, topic):
return True
class AdapterChmod(CommandAdapter):
"""
Show chmod numeric values and strings
@@ -170,8 +177,7 @@ class AdapterChmod(CommandAdapter):
# remove all non (alphanumeric, '-') chars
if topic.startswith("chmod/"):
topic = topic[6:]
topic = re.sub('[^a-zA-Z0-9-]', '', topic)
topic = re.sub("[^a-zA-Z0-9-]", "", topic)
return cmd + [topic]
+1 -1
View File
@@ -1,6 +1,6 @@
class Adapter(object):
pass
class cheatAdapter(Adapter):
pass
+31 -25
View File
@@ -5,11 +5,13 @@ Implementation of `GitRepositoryAdapter`, adapter that is used to handle git rep
import glob
import os
from .adapter import Adapter # pylint: disable=relative-import
from .adapter import Adapter # pylint: disable=relative-import
def _get_filenames(path):
return [os.path.split(topic)[1] for topic in glob.glob(path)]
class RepositoryAdapter(Adapter):
"""
Implements methods needed to handle standard
@@ -26,25 +28,26 @@ class RepositoryAdapter(Adapter):
os.path.join(
self.local_repository_location(),
self._cheatsheet_files_prefix,
'*'+self._cheatsheet_files_extension))
"*" + self._cheatsheet_files_extension,
)
)
ext = self._cheatsheet_files_extension
if ext:
answer = [filename[:-len(ext)]
for filename in answer
if filename.endswith(ext)]
answer = [
filename[: -len(ext)] for filename in answer if filename.endswith(ext)
]
return answer
def _get_page(self, topic, request_options=None):
filename = os.path.join(
self.local_repository_location(),
self._cheatsheet_files_prefix,
topic)
self.local_repository_location(), self._cheatsheet_files_prefix, topic
)
if os.path.exists(filename) and not os.path.isdir(filename):
answer = self._format_page(open(filename, 'r').read())
answer = self._format_page(open(filename, "r").read())
else:
# though it should not happen
answer = "%s:%s not found" % (str(self.__class__), topic)
@@ -52,7 +55,7 @@ class RepositoryAdapter(Adapter):
return answer
class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-method
class GitRepositoryAdapter(RepositoryAdapter): # pylint: disable=abstract-method
"""
Implements all methods needed to handle cache handling
for git-repository-based adapters
@@ -69,17 +72,18 @@ class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-meth
if not cls._repository_url:
return None
if not cls._repository_url.startswith('https://github.com/'):
if not cls._repository_url.startswith("https://github.com/"):
# in this case `fetch` has to be implemented
# in the distinct adapter subclass
raise RuntimeError(
"Do not known how to handle this repository: %s" % cls._repository_url)
"Do not known how to handle this repository: %s" % cls._repository_url
)
local_repository_dir = cls.local_repository_location()
if not local_repository_dir:
return None
return ['git', 'clone', '--depth=1', cls._repository_url, local_repository_dir]
return ["git", "clone", "--depth=1", cls._repository_url, local_repository_dir]
@classmethod
def update_command(cls):
@@ -96,13 +100,14 @@ class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-meth
if not local_repository_dir:
return None
if not cls._repository_url.startswith('https://github.com/'):
if not cls._repository_url.startswith("https://github.com/"):
# in this case `update` has to be implemented
# in the distinct adapter subclass
raise RuntimeError(
"Do not known how to handle this repository: %s" % cls._repository_url)
"Do not known how to handle this repository: %s" % cls._repository_url
)
return ['git', 'pull']
return ["git", "pull"]
@classmethod
def current_state_command(cls):
@@ -118,13 +123,14 @@ class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-meth
if not local_repository_dir:
return None
if not cls._repository_url.startswith('https://github.com/'):
if not cls._repository_url.startswith("https://github.com/"):
# in this case `update` has to be implemented
# in the distinct adapter subclass
raise RuntimeError(
"Do not known how to handle this repository: %s" % cls._repository_url)
"Do not known how to handle this repository: %s" % cls._repository_url
)
return ['git', 'rev-parse', '--short', 'HEAD', "--"]
return ["git", "rev-parse", "--short", "HEAD", "--"]
@classmethod
def save_state(cls, state):
@@ -133,8 +139,8 @@ class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-meth
Must be called after the cache clean up.
"""
local_repository_dir = cls.local_repository_location()
state_filename = os.path.join(local_repository_dir, '.cached_revision')
open(state_filename, 'wb').write(state)
state_filename = os.path.join(local_repository_dir, ".cached_revision")
open(state_filename, "wb").write(state)
@classmethod
def get_state(cls):
@@ -144,10 +150,10 @@ class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-meth
"""
local_repository_dir = cls.local_repository_location()
state_filename = os.path.join(local_repository_dir, '.cached_revision')
state_filename = os.path.join(local_repository_dir, ".cached_revision")
state = None
if os.path.exists(state_filename):
state = open(state_filename, 'r').read()
state = open(state_filename, "r").read()
return state
@classmethod
@@ -158,5 +164,5 @@ class GitRepositoryAdapter(RepositoryAdapter): #pylint: disable=abstract-meth
"""
current_state = cls.get_state()
if not current_state:
return ['git', 'ls-tree', '--full-tree', '-r', '--name-only', 'HEAD', "--"]
return ['git', 'diff', '--name-only', current_state, 'HEAD', "--"]
return ["git", "ls-tree", "--full-tree", "-r", "--name-only", "HEAD", "--"]
return ["git", "diff", "--name-only", current_state, "HEAD", "--"]
+47 -35
View File
@@ -11,10 +11,12 @@ import collections
try:
from rapidfuzz import process, fuzz
_USING_FUZZYWUZZY=False
_USING_FUZZYWUZZY = False
except ImportError:
from fuzzywuzzy import process, fuzz
_USING_FUZZYWUZZY=True
_USING_FUZZYWUZZY = True
from config import CONFIG
from .adapter import Adapter
@@ -37,16 +39,17 @@ _INTERNAL_TOPICS = [
":styles-demo",
":vim",
":zsh",
]
]
_COLORIZED_INTERNAL_TOPICS = [
':intro',
":intro",
]
class InternalPages(Adapter):
_adapter_name = 'internal'
_output_format = 'ansi'
_adapter_name = "internal"
_output_format = "ansi"
def __init__(self, get_topic_type=None, get_topics_list=None):
Adapter.__init__(self)
@@ -54,10 +57,9 @@ class InternalPages(Adapter):
self.get_topics_list = get_topics_list
def _get_stat(self):
stat = collections.Counter([
self.get_topic_type(topic)
for topic in self.get_topics_list()
])
stat = collections.Counter(
[self.get_topic_type(topic) for topic in self.get_topics_list()]
)
answer = ""
for key, val in stat.items():
@@ -69,13 +71,15 @@ class InternalPages(Adapter):
return _INTERNAL_TOPICS
def _get_list_answer(self, topic, request_options=None):
if '/' in topic:
topic_type, topic_name = topic.split('/', 1)
if "/" in topic:
topic_type, topic_name = topic.split("/", 1)
if topic_name == ":list":
topic_list = [x[len(topic_type)+1:]
for x in self.get_topics_list()
if x.startswith(topic_type + "/")]
return "\n".join(topic_list)+"\n"
topic_list = [
x[len(topic_type) + 1 :]
for x in self.get_topics_list()
if x.startswith(topic_type + "/")
]
return "\n".join(topic_list) + "\n"
answer = ""
if topic == ":list":
@@ -84,31 +88,31 @@ class InternalPages(Adapter):
return answer
def _get_page(self, topic, request_options=None):
if topic.endswith('/:list') or topic.lstrip('/') == ':list':
if topic.endswith("/:list") or topic.lstrip("/") == ":list":
return self._get_list_answer(topic)
answer = ""
if topic == ':styles':
if topic == ":styles":
answer = "\n".join(CONFIG["frontend.styles"]) + "\n"
elif topic == ":stat":
answer = self._get_stat()+"\n"
answer = self._get_stat() + "\n"
elif topic in _INTERNAL_TOPICS:
answer = open(os.path.join(CONFIG["path.internal.pages"], topic[1:]+".txt"), "r").read()
answer = open(
os.path.join(CONFIG["path.internal.pages"], topic[1:] + ".txt"), "r"
).read()
if topic in _COLORIZED_INTERNAL_TOPICS:
answer = colorize_internal(answer)
return answer
def is_found(self, topic):
return (
topic in self.get_list()
or topic.endswith('/:list')
)
return topic in self.get_list() or topic.endswith("/:list")
class UnknownPages(InternalPages):
_adapter_name = 'unknown'
_output_format = 'text'
_adapter_name = "unknown"
_output_format = "text"
@staticmethod
def get_list(prefix=None):
@@ -120,27 +124,35 @@ class UnknownPages(InternalPages):
def _get_page(self, topic, request_options=None):
topics_list = self.get_topics_list()
if topic.startswith(':'):
topics_list = [x for x in topics_list if x.startswith(':')]
if topic.startswith(":"):
topics_list = [x for x in topics_list if x.startswith(":")]
else:
topics_list = [x for x in topics_list if not x.startswith(':')]
topics_list = [x for x in topics_list if not x.startswith(":")]
if _USING_FUZZYWUZZY:
possible_topics = process.extract(topic, topics_list, scorer=fuzz.ratio)[:3]
else:
possible_topics = process.extract(topic, topics_list, limit=3, scorer=fuzz.ratio)
possible_topics_text = "\n".join([(" * %s %s" % (x[0], int(x[1]))) for x in possible_topics])
return """
possible_topics = process.extract(
topic, topics_list, limit=3, scorer=fuzz.ratio
)
possible_topics_text = "\n".join(
[(" * %s %s" % (x[0], int(x[1]))) for x in possible_topics]
)
return (
"""
Unknown topic.
Do you mean one of these topics maybe?
%s
""" % possible_topics_text
"""
% possible_topics_text
)
class Search(Adapter):
_adapter_name = 'search'
_output_format = 'text'
_adapter_name = "search"
_output_format = "text"
_cache_needed = False
@staticmethod
+5 -4
View File
@@ -12,8 +12,8 @@ import sys
import os
from .git_adapter import GitRepositoryAdapter
class Latenz(GitRepositoryAdapter):
class Latenz(GitRepositoryAdapter):
"""
chubin/late.nz Adapter
"""
@@ -23,12 +23,13 @@ class Latenz(GitRepositoryAdapter):
_repository_url = "https://github.com/chubin/late.nz"
def _get_page(self, topic, request_options=None):
sys.path.append(os.path.join(self.local_repository_location(), 'bin'))
sys.path.append(os.path.join(self.local_repository_location(), "bin"))
import latencies
return latencies.render()
def _get_list(self, prefix=None):
return ['latencies']
return ["latencies"]
def is_found(self, topic):
return topic.lower() in ['latencies', 'late.nz', 'latency']
return topic.lower() in ["latencies", "late.nz", "latency"]
+281 -148
View File
File diff suppressed because it is too large Load Diff
+33 -23
View File
@@ -38,8 +38,8 @@ If the problem persists, file a GitHub issue at
github.com/chubin/cheat.sh or ping @igor_chubin
"""
class Question(UpstreamAdapter):
class Question(UpstreamAdapter):
"""
Answer to a programming language question, using Stackoverflow
as the main data source. Heavy lifting is done by an external
@@ -62,55 +62,65 @@ class Question(UpstreamAdapter):
if not os.path.exists(CONFIG["path.internal.bin.upstream"]):
# if the upstream program is not found, use normal upstream adapter
self._output_format = "ansi"
return UpstreamAdapter._get_page(self, topic, request_options=request_options)
return UpstreamAdapter._get_page(
self, topic, request_options=request_options
)
topic = topic.replace('+', ' ')
topic = topic.replace("+", " ")
# if there is a language name in the section name,
# cut it off (de:python => python)
if '/' in topic:
section_name, topic = topic.split('/', 1)
if ':' in section_name:
_, section_name = section_name.split(':', 1)
if "/" in topic:
section_name, topic = topic.split("/", 1)
if ":" in section_name:
_, section_name = section_name.split(":", 1)
section_name = SO_NAME.get(section_name, section_name)
topic = "%s/%s" % (section_name, topic)
# some clients send queries with - instead of + so we have to rewrite them to
topic = re.sub(r"(?<!-)-", ' ', topic)
topic = re.sub(r"(?<!-)-", " ", topic)
topic_words = topic.split()
topic = " ".join(topic_words)
lang = 'en'
lang = "en"
try:
query_text = topic # " ".join(topic)
query_text = re.sub('^[^/]*/+', '', query_text.rstrip('/'))
query_text = re.sub('/[0-9]+$', '', query_text)
query_text = re.sub('/[0-9]+$', '', query_text)
query_text = topic # " ".join(topic)
query_text = re.sub("^[^/]*/+", "", query_text.rstrip("/"))
query_text = re.sub("/[0-9]+$", "", query_text)
query_text = re.sub("/[0-9]+$", "", query_text)
detector = Detector(query_text)
supposed_lang = detector.languages[0].code
if len(topic_words) > 2 \
or supposed_lang in ['az', 'ru', 'uk', 'de', 'fr', 'es', 'it', 'nl']:
if len(topic_words) > 2 or supposed_lang in [
"az",
"ru",
"uk",
"de",
"fr",
"es",
"it",
"nl",
]:
lang = supposed_lang
if supposed_lang.startswith('zh_') or supposed_lang == 'zh':
lang = 'zh'
elif supposed_lang.startswith('pt_'):
lang = 'pt'
if supposed_lang in ['ja', 'ko']:
if supposed_lang.startswith("zh_") or supposed_lang == "zh":
lang = "zh"
elif supposed_lang.startswith("pt_"):
lang = "pt"
if supposed_lang in ["ja", "ko"]:
lang = supposed_lang
except UnknownLanguage:
print("Unknown language (%s)" % query_text)
if lang != 'en':
topic = ['--human-language', lang, topic]
if lang != "en":
topic = ["--human-language", lang, topic]
else:
topic = [topic]
cmd = [CONFIG["path.internal.bin.upstream"]] + topic
proc = Popen(cmd, stdin=open(os.devnull, "r"), stdout=PIPE, stderr=PIPE)
answer = proc.communicate()[0].decode('utf-8')
answer = proc.communicate()[0].decode("utf-8")
if not answer:
return NOT_FOUND_MESSAGE
+34 -26
View File
@@ -15,8 +15,8 @@ import yaml
from .git_adapter import GitRepositoryAdapter
from .cheat_sheets import CheatSheets
class Rosetta(GitRepositoryAdapter):
class Rosetta(GitRepositoryAdapter):
"""
Adapter for RosettaCode
"""
@@ -36,17 +36,19 @@ class Rosetta(GitRepositoryAdapter):
def _load_rosetta_code_names():
answer = {}
lang_files_location = CheatSheets.local_repository_location(cheat_sheets_location=True)
for filename in glob.glob(os.path.join(lang_files_location, '*/_info.yaml')):
text = open(filename, 'r').read()
lang_files_location = CheatSheets.local_repository_location(
cheat_sheets_location=True
)
for filename in glob.glob(os.path.join(lang_files_location, "*/_info.yaml")):
text = open(filename, "r").read()
data = yaml.load(text, Loader=yaml.SafeLoader)
if data is None:
continue
lang = os.path.basename(os.path.dirname(filename))
if lang.startswith('_'):
if lang.startswith("_"):
lang = lang[1:]
if 'rosetta' in data:
answer[lang] = data['rosetta']
if "rosetta" in data:
answer[lang] = data["rosetta"]
return answer
def _rosetta_get_list(self, query, task=None):
@@ -56,9 +58,13 @@ class Rosetta(GitRepositoryAdapter):
lang = self._rosetta_code_name[query]
answer = []
if task:
glob_path = os.path.join(self.local_repository_location(), 'Lang', lang, task, '*')
glob_path = os.path.join(
self.local_repository_location(), "Lang", lang, task, "*"
)
else:
glob_path = os.path.join(self.local_repository_location(), 'Lang', lang, '*')
glob_path = os.path.join(
self.local_repository_location(), "Lang", lang, "*"
)
for filename in glob.glob(glob_path):
taskname = os.path.basename(filename)
answer.append(taskname)
@@ -68,8 +74,8 @@ class Rosetta(GitRepositoryAdapter):
@staticmethod
def _parse_query(query):
if '/' in query:
task, subquery = query.split('/', 1)
if "/" in query:
task, subquery = query.split("/", 1)
else:
task, subquery = query, None
return task, subquery
@@ -80,9 +86,9 @@ class Rosetta(GitRepositoryAdapter):
task, subquery = self._parse_query(query)
if task == ':list':
if task == ":list":
return self._rosetta_get_list(lang)
if subquery == ':list':
if subquery == ":list":
return self._rosetta_get_list(lang, task=task)
# if it is not a number or the number is too big, just ignore it
@@ -95,41 +101,43 @@ class Rosetta(GitRepositoryAdapter):
lang_name = self._rosetta_code_name[lang]
tasks = sorted(glob.glob(
os.path.join(self.local_repository_location(), 'Lang', lang_name, task, '*')))
tasks = sorted(
glob.glob(
os.path.join(
self.local_repository_location(), "Lang", lang_name, task, "*"
)
)
)
if not tasks:
return ""
if len(tasks) < index or index < 1:
index = 1
answer_filename = tasks[index-1]
answer = open(answer_filename, 'r').read()
answer_filename = tasks[index - 1]
answer = open(answer_filename, "r").read()
return answer
def _starting_page(self, query):
number_of_pages = self._rosetta_get_list(query)
answer = (
"# %s pages available\n"
"# use /:list to list"
) % number_of_pages
answer = ("# %s pages available\n" "# use /:list to list") % number_of_pages
return answer
def _get_page(self, topic, request_options=None):
if '/' not in topic:
if "/" not in topic:
return self._rosetta_get_list(topic)
lang, topic = topic.split('/', 1)
lang, topic = topic.split("/", 1)
# this part should be generalized
# currently we just remove the name of the adapter from the path
if topic == self.__section_name:
return self._starting_page(topic)
if topic.startswith(self.__section_name + '/'):
topic = topic[len(self.__section_name + '/'):]
if topic.startswith(self.__section_name + "/"):
topic = topic[len(self.__section_name + "/") :]
return self._get_task(lang, topic)
@@ -139,7 +147,7 @@ class Rosetta(GitRepositoryAdapter):
def get_list(self, prefix=None):
answer = [self.__section_name]
for i in self._rosetta_code_name:
answer.append('%s/%s/' % (i, self.__section_name))
answer.append("%s/%s/" % (i, self.__section_name))
return answer
def is_found(self, _):
+13 -14
View File
@@ -14,8 +14,8 @@ import os
from .git_adapter import GitRepositoryAdapter
class Tldr(GitRepositoryAdapter):
class Tldr(GitRepositoryAdapter):
"""
tldr-pages/tldr adapter
"""
@@ -41,7 +41,7 @@ class Tldr(GitRepositoryAdapter):
skip_empty = False
header = 2
for line in text.splitlines():
if line.strip() == '':
if line.strip() == "":
if skip_empty and not header:
continue
if header == 1:
@@ -51,17 +51,17 @@ class Tldr(GitRepositoryAdapter):
else:
skip_empty = False
if line.startswith('-'):
line = '# '+line[2:]
if line.startswith("-"):
line = "# " + line[2:]
skip_empty = True
elif line.startswith('> '):
elif line.startswith("> "):
if header == 2:
header = 1
line = '# '+line[2:]
line = "# " + line[2:]
skip_empty = True
elif line.startswith('`') and line.endswith('`'):
elif line.startswith("`") and line.endswith("`"):
line = line[1:-1]
line = re.sub(r'{{(.*?)}}', r'\1', line)
line = re.sub(r"{{(.*?)}}", r"\1", line)
answer.append(line)
@@ -73,23 +73,22 @@ class Tldr(GitRepositoryAdapter):
and as soon as anything is found, format and return it.
"""
search_order = ['common', 'linux', 'osx', 'sunos', 'windows', "android"]
search_order = ["common", "linux", "osx", "sunos", "windows", "android"]
local_rep = self.local_repository_location()
ext = self._cheatsheet_files_extension
filename = None
for subdir in search_order:
_filename = os.path.join(
local_rep, 'pages', subdir, "%s%s" % (topic, ext))
_filename = os.path.join(local_rep, "pages", subdir, "%s%s" % (topic, ext))
if os.path.exists(_filename):
filename = _filename
break
if filename:
answer = self._format_page(open(filename, 'r').read())
answer = self._format_page(open(filename, "r").read())
else:
# though it should not happen
answer = ''
answer = ""
return answer
@@ -104,5 +103,5 @@ class Tldr(GitRepositoryAdapter):
for entry in updated_files_list:
if entry.endswith(ext):
answer.append(entry.split('/')[-1][:-len(ext)])
answer.append(entry.split("/")[-1][: -len(ext)])
return answer
+15 -7
View File
@@ -15,6 +15,7 @@ import requests
from config import CONFIG
from .adapter import Adapter
def _are_you_offline():
return textwrap.dedent(
"""
@@ -32,10 +33,11 @@ def _are_you_offline():
|____|_______|____| the authors to develop it as soon as possible
.
""")
"""
)
class UpstreamAdapter(Adapter):
"""
Connect to the upstream server `CONFIG["upstream.url"]` and fetch
response from it. The response is supposed to have the "ansi" format.
@@ -52,15 +54,21 @@ class UpstreamAdapter(Adapter):
def _get_page(self, topic, request_options=None):
options_string = "&".join(["%s=%s" % (x, y) for (x, y) in request_options.items()])
url = CONFIG["upstream.url"].rstrip('/') \
+ '/' + topic.lstrip('/') \
+ "?" + options_string
options_string = "&".join(
["%s=%s" % (x, y) for (x, y) in request_options.items()]
)
url = (
CONFIG["upstream.url"].rstrip("/")
+ "/"
+ topic.lstrip("/")
+ "?"
+ options_string
)
try:
response = requests.get(url, timeout=CONFIG["upstream.timeout"])
answer = {"cache": False, "answer": response.text}
except requests.exceptions.ConnectionError:
answer = {"cache": False, "answer":_are_you_offline()}
answer = {"cache": False, "answer": _are_you_offline()}
return answer
def _get_list(self, prefix=None):
-1
View File
@@ -17,4 +17,3 @@ GITHUB_BUTTON_FOOTER = """
<!-- Place this tag right after the last button or just before your close body tag. -->
<script async defer id="github-bjs" src="https://buttons.github.io/buttons.js"></script>
"""
+11 -6
View File
@@ -17,17 +17,20 @@ import json
from config import CONFIG
_REDIS = None
if CONFIG['cache.type'] == 'redis':
if CONFIG["cache.type"] == "redis":
import redis
_REDIS = redis.Redis(
host=CONFIG['cache.redis.host'],
port=CONFIG['cache.redis.port'],
db=CONFIG['cache.redis.db'])
_REDIS_PREFIX = ''
_REDIS = redis.Redis(
host=CONFIG["cache.redis.host"],
port=CONFIG["cache.redis.port"],
db=CONFIG["cache.redis.db"],
)
_REDIS_PREFIX = ""
if CONFIG.get("cache.redis.prefix", ""):
_REDIS_PREFIX = CONFIG["cache.redis.prefix"] + ":"
def put(key, value):
"""
Save `value` with `key`, and serialize it if needed
@@ -42,6 +45,7 @@ def put(key, value):
_REDIS.set(key, value)
def get(key):
"""
Read `value` by `key`, and deserialize it if needed
@@ -59,6 +63,7 @@ def get(key):
return value
return None
def delete(key):
"""
Remove `key` from the database
+33 -29
View File
@@ -19,19 +19,21 @@ import postprocessing
import frontend.html
import frontend.ansi
def _add_section_name(query):
# temporary solution before we don't find a fixed one
if ' ' not in query and '+' not in query:
if " " not in query and "+" not in query:
return query
if '/' in query:
if "/" in query:
return query
if ' ' in query:
return re.sub(r' +', '/', query, count=1)
if '+' in query:
if " " in query:
return re.sub(r" +", "/", query, count=1)
if "+" in query:
# replace only single + to avoid catching g++ and friends
return re.sub(r'([^\+])\+([^\+])', r'\1/\2', query, count=1)
return re.sub(r"([^\+])\+([^\+])", r"\1/\2", query, count=1)
def cheat_wrapper(query, request_options=None, output_format='ansi'):
def cheat_wrapper(query, request_options=None, output_format="ansi"):
"""
Function that delivers cheat sheet for `query`.
If `html` is True, the answer is formatted as HTML.
@@ -39,8 +41,8 @@ def cheat_wrapper(query, request_options=None, output_format='ansi'):
"""
def _rewrite_aliases(word):
if word == ':bash.completion':
return ':bash_completion'
if word == ":bash.completion":
return ":bash_completion"
return word
def _rewrite_section_name(query):
@@ -49,22 +51,22 @@ def cheat_wrapper(query, request_options=None, output_format='ansi'):
* EDITOR:NAME => emacs:go-mode
"""
if '/' not in query:
if "/" not in query:
return query
section_name, rest = query.split('/', 1)
section_name, rest = query.split("/", 1)
if ':' in section_name:
if ":" in section_name:
section_name = rewrite_editor_section_name(section_name)
section_name = LANGUAGE_ALIAS.get(section_name, section_name)
return "%s/%s" % (section_name, rest)
def _sanitize_query(query):
return re.sub('[<>"]', '', query)
return re.sub('[<>"]', "", query)
def _strip_hyperlink(query):
return re.sub('(,[0-9]+)+$', '', query)
return re.sub("(,[0-9]+)+$", "", query)
def _parse_query(query):
topic = query
@@ -72,16 +74,16 @@ def cheat_wrapper(query, request_options=None, output_format='ansi'):
search_options = ""
keyword = None
if '~' in query:
if "~" in query:
topic = query
pos = topic.index('~')
keyword = topic[pos+1:]
pos = topic.index("~")
keyword = topic[pos + 1 :]
topic = topic[:pos]
if '/' in keyword:
if "/" in keyword:
search_options = keyword[::-1]
search_options = search_options[:search_options.index('/')]
keyword = keyword[:-len(search_options)-1]
search_options = search_options[: search_options.index("/")]
keyword = keyword[: -len(search_options) - 1]
return topic, keyword, search_options
@@ -97,25 +99,27 @@ def cheat_wrapper(query, request_options=None, output_format='ansi'):
if keyword:
answers = find_answers_by_keyword(
topic, keyword, options=search_options, request_options=request_options)
topic, keyword, options=search_options, request_options=request_options
)
else:
answers = get_answers(topic, request_options=request_options)
answers = [
postprocessing.postprocess(
answer, keyword, search_options, request_options=request_options)
answer, keyword, search_options, request_options=request_options
)
for answer in answers
]
answer_data = {
'query': query,
'keyword': keyword,
'answers': answers,
}
"query": query,
"keyword": keyword,
"answers": answers,
}
if output_format == 'html':
answer_data['topics_list'] = get_topics_list()
if output_format == "html":
answer_data["topics_list"] = get_topics_list()
return frontend.html.visualize(answer_data, request_options)
elif output_format == 'json':
elif output_format == "json":
return json.dumps(answer_data, indent=4)
return frontend.ansi.visualize(answer_data, request_options)
+3 -2
View File
@@ -30,10 +30,11 @@ g++ -O1
g++/-O1
"""
def test_header_split():
for inp in unchanged.strip().splitlines():
assert inp == _add_section_name(inp)
for test in split.strip().split('\n\n'):
inp, outp = test.split('\n')
for test in split.strip().split("\n\n"):
inp, outp = test.split("\n")
assert outp == _add_section_name(inp)
+40 -23
View File
@@ -45,12 +45,14 @@ from __future__ import print_function
import os
from pygments.styles import get_all_styles
#def get_all_styles():
# def get_all_styles():
# return []
_ENV_VAR_PREFIX = "CHEATSH"
_MYDIR = os.path.abspath(os.path.join(__file__, '..', '..'))
_MYDIR = os.path.abspath(os.path.join(__file__, "..", ".."))
def _config_locations():
"""
@@ -59,17 +61,24 @@ def _config_locations():
* `_WORKDIR`, `_CONF_FILE_WORKDIR`, `_CONF_FILE_MYDIR`
"""
var = _ENV_VAR_PREFIX + '_PATH_WORKDIR'
workdir = os.environ[var] if var in os.environ \
else os.path.join(os.environ['HOME'], '.cheat.sh')
var = _ENV_VAR_PREFIX + "_PATH_WORKDIR"
workdir = (
os.environ[var]
if var in os.environ
else os.path.join(os.environ["HOME"], ".cheat.sh")
)
var = _ENV_VAR_PREFIX + '_CONFIG'
conf_file_workdir = os.environ[var] if var in os.environ \
else os.path.join(workdir, 'etc/config.yaml')
var = _ENV_VAR_PREFIX + "_CONFIG"
conf_file_workdir = (
os.environ[var]
if var in os.environ
else os.path.join(workdir, "etc/config.yaml")
)
conf_file_mydir = os.path.join(_MYDIR, 'etc/config.yaml')
conf_file_mydir = os.path.join(_MYDIR, "etc/config.yaml")
return workdir, conf_file_workdir, conf_file_mydir
_WORKDIR, _CONF_FILE_WORKDIR, _CONF_FILE_MYDIR = _config_locations()
_CONFIG = {
@@ -87,10 +96,10 @@ _CONFIG = {
"rfc",
"oeis",
"chmod",
],
],
"adapters.mandatory": [
"search",
],
],
"cache.redis.db": 0,
"cache.redis.host": "localhost",
"cache.redis.port": 6379,
@@ -101,7 +110,9 @@ _CONFIG = {
"path.internal.ansi2html": os.path.join(_MYDIR, "share/ansi2html.sh"),
"path.internal.bin": os.path.join(_MYDIR, "bin"),
"path.internal.bin.upstream": os.path.join(_MYDIR, "bin", "upstream"),
"path.internal.malformed": os.path.join(_MYDIR, "share/static/malformed-response.html"),
"path.internal.malformed": os.path.join(
_MYDIR, "share/static/malformed-response.html"
),
"path.internal.pages": os.path.join(_MYDIR, "share"),
"path.internal.static": os.path.join(_MYDIR, "share/static"),
"path.internal.templates": os.path.join(_MYDIR, "share/templates"),
@@ -121,7 +132,7 @@ _CONFIG = {
("^:", "internal"),
("/:list$", "internal"),
("/$", "cheat.sheets dir"),
],
],
"routing.main": [
("", "cheat.sheets"),
("", "cheat"),
@@ -133,14 +144,15 @@ _CONFIG = {
"routing.post": [
("^[^/ +]*$", "unknown"),
("^[a-z][a-z]-[a-z][a-z]$", "translation"),
],
],
"routing.default": "question",
"upstream.url": "https://cheat.sh",
"upstream.timeout": 5,
"search.limit": 20,
"server.bind": "0.0.0.0",
"server.port": 8002,
}
}
class Config(dict):
"""
@@ -149,16 +161,16 @@ class Config(dict):
"""
def _absolute_path(self, val):
if val.startswith('/'):
if val.startswith("/"):
return val
return os.path.join(self['path.workdir'], val)
return os.path.join(self["path.workdir"], val)
def __init__(self, *args, **kwargs):
dict.__init__(self)
self.update(*args, **kwargs)
def __setitem__(self, key, val):
if key.startswith('path.') and not val.startswith('/'):
if key.startswith("path.") and not val.startswith("/"):
val = self._absolute_path(val)
dict.__setitem__(self, key, val)
@@ -170,12 +182,13 @@ class Config(dict):
"""
newdict = dict(*args, **kwargs)
if 'path.workdir' in newdict:
self['path.workdir'] = newdict['path.workdir']
if "path.workdir" in newdict:
self["path.workdir"] = newdict["path.workdir"]
for key, val in newdict.items():
self[key] = val
def _load_config_from_environ(config):
update = {}
@@ -183,7 +196,7 @@ def _load_config_from_environ(config):
if not isinstance(val, str) or isinstance(val, int):
continue
env_var = _ENV_VAR_PREFIX + '_' + key.replace('.', '_').upper()
env_var = _ENV_VAR_PREFIX + "_" + key.replace(".", "_").upper()
if not env_var in os.environ:
continue
@@ -198,6 +211,7 @@ def _load_config_from_environ(config):
return update
def _get_nested(data, key):
"""
Return value for a hierrachical key (like a.b.c).
@@ -215,12 +229,12 @@ def _get_nested(data, key):
if not data or not isinstance(data, dict):
return None
if '.' not in key:
if "." not in key:
return data.get(key)
if key in data:
return data[key]
parts = key.split('.')
parts = key.split(".")
for i in range(len(parts))[::-1]:
prefix = ".".join(parts[:i])
if prefix in data:
@@ -228,6 +242,7 @@ def _get_nested(data, key):
return None
def _load_config_from_file(default_config, filename):
import yaml
@@ -252,6 +267,7 @@ def _load_config_from_file(default_config, filename):
return update
CONFIG = Config()
CONFIG.update(_CONFIG)
CONFIG.update(_load_config_from_file(_CONFIG, _CONF_FILE_MYDIR))
@@ -261,4 +277,5 @@ CONFIG.update(_load_config_from_environ(_CONFIG))
if __name__ == "__main__":
import doctest
doctest.testmod()
+54 -19
View File
@@ -24,6 +24,7 @@ import cache
from config import CONFIG
def _log(*message):
logging.info(*message)
if len(message) > 1:
@@ -31,15 +32,18 @@ def _log(*message):
else:
message = message[0].rstrip("\n")
sys.stdout.write(message+"\n")
sys.stdout.write(message + "\n")
def _run_cmd(cmd):
shell = isinstance(cmd, str)
process = subprocess.Popen(
cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
output = process.communicate()[0]
return process.returncode, output
def fetch_all(skip_existing=True):
"""
Fetch all known repositories mentioned in the adapters
@@ -58,8 +62,11 @@ def fetch_all(skip_existing=True):
sys.stdout.flush()
try:
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True)
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
except OSError:
print("\nERROR: %s" % cmd)
raise
@@ -76,10 +83,14 @@ def fetch_all(skip_existing=True):
location = adptr.local_repository_location()
if not location:
continue
if location in known_location \
and adptr.repository_url() != known_location[location].repository_url():
fatal("Duplicate location: %s for %s and %s"
% (location, adptr, known_location[location]))
if (
location in known_location
and adptr.repository_url() != known_location[location].repository_url()
):
fatal(
"Duplicate location: %s for %s and %s"
% (location, adptr, known_location[location])
)
known_location[location] = adptr
# Parent directories creation
@@ -101,9 +112,12 @@ def fetch_all(skip_existing=True):
os.makedirs(parent)
known_location = {k:v for k, v in known_location.items() if k not in existing_locations}
known_location = {
k: v for k, v in known_location.items() if k not in existing_locations
}
_fetch_locations(known_location)
def _update_adapter(adptr):
"""
Update implementation.
@@ -118,7 +132,10 @@ def _update_adapter(adptr):
errorcode, output = _run_cmd(cmd)
if errorcode:
_log("\nERROR:\n---%s\n" % output.decode("utf-8") + "\n---\nCould not update %s" % adptr)
_log(
"\nERROR:\n---%s\n" % output.decode("utf-8")
+ "\n---\nCould not update %s" % adptr
)
return False
# Getting current repository state
@@ -129,7 +146,11 @@ def _update_adapter(adptr):
if cmd:
errorcode, state = _run_cmd(cmd)
if errorcode:
_log("\nERROR:\n---\n" + state + "\n---\nCould not get repository state: %s" % adptr)
_log(
"\nERROR:\n---\n"
+ state
+ "\n---\nCould not get repository state: %s" % adptr
)
return False
state = state.strip()
@@ -141,7 +162,11 @@ def _update_adapter(adptr):
errorcode, output = _run_cmd(cmd)
output = output.decode("utf-8")
if errorcode:
_log("\nERROR:\n---\n" + output + "\n---\nCould not get list of pages to be updated: %s" % adptr)
_log(
"\nERROR:\n---\n"
+ output
+ "\n---\nCould not get list of pages to be updated: %s" % adptr
)
return False
updates = output.splitlines()
@@ -161,6 +186,7 @@ def _update_adapter(adptr):
adptr.save_state(state)
return True
def update_all():
"""
Update all known repositories, mentioned in the adapters
@@ -177,14 +203,18 @@ def update_all():
_update_adapter(adptr)
def update_by_name(name):
"""
Find adapter by its `name` and update only it.
"""
pass
def _show_usage():
sys.stdout.write(textwrap.dedent("""
sys.stdout.write(
textwrap.dedent(
"""
Usage:
python lib/fetch.py [command]
@@ -195,7 +225,10 @@ def _show_usage():
update [name] -- update repository of the adapter `name`
fetch-all -- fetch all configured repositories
"""))
"""
)
)
def main(args):
"""
@@ -213,17 +246,19 @@ def main(args):
logging.basicConfig(
filename=CONFIG["path.log.fetch"],
level=logging.DEBUG,
format='%(asctime)s %(message)s')
format="%(asctime)s %(message)s",
)
if args[0] == 'fetch-all':
if args[0] == "fetch-all":
fetch_all()
elif args[0] == 'update':
elif args[0] == "update":
update_by_name(sys.argv[1])
elif args[0] == 'update-all':
elif args[0] == "update-all":
update_all()
else:
_show_usage()
sys.exit(0)
if __name__ == '__main__':
if __name__ == "__main__":
main(sys.argv[1:])
+76 -51
View File
@@ -33,29 +33,33 @@ from config import CONFIG
from languages_data import VIM_NAME
import cache
FNULL = open(os.devnull, 'w')
FNULL = open(os.devnull, "w")
TEXT = 0
CODE = 1
UNDEFINED = -1
CODE_WHITESPACE = -2
def _language_name(name):
return VIM_NAME.get(name, name)
def _remove_empty_lines_from_beginning(lines):
start = 0
while start < len(lines) and lines[start].strip() == '':
while start < len(lines) and lines[start].strip() == "":
start += 1
lines = lines[start:]
return lines
def _remove_empty_lines_from_end(lines):
end = len(lines) - 1
while end >= 0 and lines[end].strip() == '':
while end >= 0 and lines[end].strip() == "":
end -= 1
lines = lines[:end+1]
lines = lines[: end + 1]
return lines
def _cleanup_lines(lines):
"""
Cleanup `lines` a little bit: remove empty lines at the beginning
@@ -66,9 +70,14 @@ def _cleanup_lines(lines):
if lines == []:
return lines
# remove repeating empty lines
lines = list(chain.from_iterable(
[(list(x[1]) if x[0] else [''])
for x in groupby(lines, key=lambda x: x.strip() != '')]))
lines = list(
chain.from_iterable(
[
(list(x[1]) if x[0] else [""])
for x in groupby(lines, key=lambda x: x.strip() != "")
]
)
)
return lines
@@ -89,31 +98,32 @@ def _line_type(line):
or if it is the first/last line and it has
code on the other side.
"""
if line.strip() == '':
if line.strip() == "":
return UNDEFINED
# some line may start with spaces but still be not code.
# we need some heuristics here, but for the moment just
# whitelist such cases:
if line.strip().startswith('* ') or re.match(r'[0-9]+\.', line.strip()):
if line.strip().startswith("* ") or re.match(r"[0-9]+\.", line.strip()):
return TEXT
if line.startswith(' '):
if line.startswith(" "):
return CODE
return TEXT
def _classify_lines(lines):
line_types = [_line_type(line) for line in lines]
# pass 2:
# adding empty code lines to the code
for i in range(len(line_types) - 1):
if line_types[i] == CODE and line_types[i+1] == UNDEFINED:
line_types[i+1] = CODE_WHITESPACE
if line_types[i] == CODE and line_types[i + 1] == UNDEFINED:
line_types[i + 1] = CODE_WHITESPACE
changed = True
for i in range(len(line_types) - 1)[::-1]:
if line_types[i] == UNDEFINED and line_types[i+1] == CODE:
if line_types[i] == UNDEFINED and line_types[i + 1] == CODE:
line_types[i] = CODE_WHITESPACE
changed = True
line_types = [CODE if x == CODE_WHITESPACE else x for x in line_types]
@@ -127,12 +137,12 @@ def _classify_lines(lines):
# changing all lines types that are near the text
for i in range(len(line_types) - 1):
if line_types[i] == TEXT and line_types[i+1] == UNDEFINED:
line_types[i+1] = TEXT
if line_types[i] == TEXT and line_types[i + 1] == UNDEFINED:
line_types[i + 1] = TEXT
changed = True
for i in range(len(line_types) - 1)[::-1]:
if line_types[i] == UNDEFINED and line_types[i+1] == TEXT:
if line_types[i] == UNDEFINED and line_types[i + 1] == TEXT:
line_types[i] = TEXT
changed = True
@@ -140,15 +150,17 @@ def _classify_lines(lines):
line_types = [CODE if x == UNDEFINED else x for x in line_types]
return line_types
def _unindent_code(line, shift=0):
if shift == -1 and line != '':
return ' ' + line
if shift > 0 and line.startswith(' '*shift):
def _unindent_code(line, shift=0):
if shift == -1 and line != "":
return " " + line
if shift > 0 and line.startswith(" " * shift):
return line[shift:]
return line
def _wrap_lines(lines_classes, unindent_code=False):
"""
Wrap classified lines. Add the split lines to the stream.
@@ -169,6 +181,7 @@ def _wrap_lines(lines_classes, unindent_code=False):
return result
def _run_vim_script(script_lines, text_lines):
"""
Apply `script_lines` to `lines_classes`
@@ -185,34 +198,43 @@ def _run_vim_script(script_lines, text_lines):
textfile.file.close()
my_env = os.environ.copy()
my_env['HOME'] = CONFIG["path.internal.vim"]
my_env["HOME"] = CONFIG["path.internal.vim"]
cmd = ["script", "-q", "-c",
"vim -S %s %s" % (script_vim.name, textfile.name)]
cmd = ["script", "-q", "-c", "vim -S %s %s" % (script_vim.name, textfile.name)]
Popen(cmd, shell=False,
stdin=open(os.devnull, 'r'),
stdout=FNULL, stderr=FNULL, env=my_env).communicate()
Popen(
cmd,
shell=False,
stdin=open(os.devnull, "r"),
stdout=FNULL,
stderr=FNULL,
env=my_env,
).communicate()
return open(textfile.name, "r").read()
def _commenting_script(lines_blocks, filetype):
script_lines = []
block_start = 1
for block in lines_blocks:
lines = list(block[1])
block_end = block_start + len(lines)-1
block_end = block_start + len(lines) - 1
if block[0] == 0:
comment_type = 'sexy'
if block_end - block_start < 1 or filetype == 'ruby':
comment_type = 'comment'
comment_type = "sexy"
if block_end - block_start < 1 or filetype == "ruby":
comment_type = "comment"
script_lines.insert(0, "%s,%s call NERDComment(1, '%s')"
% (block_start, block_end, comment_type))
script_lines.insert(0, "%s,%s call NERDComment(1, 'uncomment')"
% (block_start, block_end))
script_lines.insert(
0,
"%s,%s call NERDComment(1, '%s')"
% (block_start, block_end, comment_type),
)
script_lines.insert(
0, "%s,%s call NERDComment(1, 'uncomment')" % (block_start, block_end)
)
block_start = block_end + 1
@@ -221,6 +243,7 @@ def _commenting_script(lines_blocks, filetype):
return script_lines
def _beautify(text, filetype, add_comments=False, remove_text=False):
"""
Main function that actually does the whole beautification job.
@@ -230,7 +253,7 @@ def _beautify(text, filetype, add_comments=False, remove_text=False):
# or remove the text completely. Otherwise the code has to remain aligned
unindent_code = add_comments or remove_text
lines = [x.decode("utf-8").rstrip('\n') for x in text.splitlines()]
lines = [x.decode("utf-8").rstrip("\n") for x in text.splitlines()]
lines = _cleanup_lines(lines)
lines_classes = zip(_classify_lines(lines), lines)
lines_classes = _wrap_lines(lines_classes, unindent_code=unindent_code)
@@ -239,34 +262,33 @@ def _beautify(text, filetype, add_comments=False, remove_text=False):
lines = [line[1] for line in lines_classes if line[0] == 1]
lines = _cleanup_lines(lines)
output = "\n".join(lines)
if not output.endswith('\n'):
if not output.endswith("\n"):
output += "\n"
elif not add_comments:
output = "\n".join(line[1] for line in lines_classes)
else:
lines_blocks = groupby(lines_classes, key=lambda x: x[0])
script_lines = _commenting_script(lines_blocks, filetype)
output = _run_vim_script(
script_lines,
[line for (_, line) in lines_classes])
output = _run_vim_script(script_lines, [line for (_, line) in lines_classes])
return output
def code_blocks(text, wrap_lines=False, unindent_code=False):
"""
Split `text` into blocks of text and code.
Return list of tuples TYPE, TEXT
"""
text = text.encode('utf-8')
text = text.encode("utf-8")
lines = [x.rstrip('\n') for x in text.splitlines()]
lines = [x.rstrip("\n") for x in text.splitlines()]
lines_classes = zip(_classify_lines(lines), lines)
if wrap_lines:
lines_classes = _wrap_lines(lines_classes, unindent_code=unindent_code)
lines_blocks = groupby(lines_classes, key=lambda x: x[0])
answer = [(x[0], "\n".join([y[1] for y in x[1]])+"\n") for x in lines_blocks]
answer = [(x[0], "\n".join([y[1] for y in x[1]]) + "\n") for x in lines_blocks]
return answer
@@ -279,21 +301,22 @@ def beautify(text, lang, options):
"""
options = options or {}
beauty_options = dict((k, v) for k, v in options.items() if k in
['add_comments', 'remove_text'])
beauty_options = dict(
(k, v) for k, v in options.items() if k in ["add_comments", "remove_text"]
)
mode = ''
if beauty_options.get('add_comments'):
mode += 'c'
if beauty_options.get('remove_text'):
mode += 'q'
mode = ""
if beauty_options.get("add_comments"):
mode += "c"
if beauty_options.get("remove_text"):
mode += "q"
if beauty_options == {}:
# if mode is unknown, just don't transform the text at all
return text
if isinstance(text, str):
text = text.encode('utf-8')
text = text.encode("utf-8")
digest = "t:%s:%s:%s" % (hashlib.md5(text).hexdigest(), lang, mode)
# temporary added line that removes invalid cache entries
@@ -309,6 +332,7 @@ def beautify(text, lang, options):
return answer
def __main__():
text = sys.stdin.read()
filetype = sys.argv[1]
@@ -321,5 +345,6 @@ def __main__():
result = beautify(text, filetype, options)
sys.stdout.write(result)
if __name__ == '__main__':
if __name__ == "__main__":
__main__()
+41 -30
View File
@@ -16,7 +16,7 @@ PALETTES = {
1: {
1: Fore.CYAN,
2: Fore.GREEN,
3: colored.fg('orange_3'),
3: colored.fg("orange_3"),
4: Style.DIM,
5: Style.DIM,
},
@@ -27,12 +27,9 @@ PALETTES = {
}
def _reverse_palette(code):
return {
1 : Fore.BLACK + _back_color(code),
2 : Style.DIM
}
return {1: Fore.BLACK + _back_color(code), 2: Style.DIM}
def _back_color(code):
if code == 0 or (isinstance(code, str) and code.lower() == "white"):
@@ -44,6 +41,7 @@ def _back_color(code):
return Back.WHITE
def colorize_internal(text, palette_number=1):
"""
Colorize `text`, use `palette`
@@ -51,26 +49,27 @@ def colorize_internal(text, palette_number=1):
palette = PALETTES[palette_number]
palette_reverse = _reverse_palette(palette_number)
def _process_text(text):
text = text.group()[1:-1]
factor = 1
if text.startswith('-'):
if text.startswith("-"):
text = text[1:]
factor = -1
stripped = text.lstrip('0123456789')
stripped = text.lstrip("0123456789")
return (text, stripped, factor)
def _extract_color_number(text, stripped, factor=1):
return int(text[:len(text)-len(stripped)])*factor
return int(text[: len(text) - len(stripped)]) * factor
def _colorize_curlies_block(text):
text, stripped, factor = _process_text(text)
color_number = _extract_color_number(text, stripped, factor)
if stripped.startswith('='):
if stripped.startswith("="):
stripped = stripped[1:]
reverse = (color_number < 0)
reverse = color_number < 0
if reverse:
color_number = -color_number
@@ -82,10 +81,10 @@ def colorize_internal(text, palette_number=1):
return stripped
def _colorize_headers(text):
if text.group(0).endswith('\n'):
newline = '\n'
if text.group(0).endswith("\n"):
newline = "\n"
else:
newline = ''
newline = ""
color_number = 3
return palette[color_number] + text.group(0).strip() + Style.RESET_ALL + newline
@@ -94,6 +93,7 @@ def colorize_internal(text, palette_number=1):
text = re.sub("#(.*?)\n", _colorize_headers, text)
return text
def colorize_internal_firstpage_v1(answer):
"""
Colorize "/:firstpage-v1".
@@ -101,28 +101,39 @@ def colorize_internal_firstpage_v1(answer):
"""
def _colorize_line(line):
if line.startswith('T'):
line = colored.fg("grey_62") + line + colored.attr('reset')
line = re.sub(r"\{(.*?)\}", colored.fg("orange_3") + r"\1"+colored.fg('grey_35'), line)
if line.startswith("T"):
line = colored.fg("grey_62") + line + colored.attr("reset")
line = re.sub(
r"\{(.*?)\}",
colored.fg("orange_3") + r"\1" + colored.fg("grey_35"),
line,
)
return line
line = re.sub(r"\[(F.*?)\]",
colored.bg("black") + colored.fg("cyan") + r"[\1]"+colored.attr('reset'),
line)
line = re.sub(r"\[(g.*?)\]",
colored.bg("dark_gray")+colored.fg("grey_0")+r"[\1]"+colored.attr('reset'),
line)
line = re.sub(r"\{(.*?)\}",
colored.fg("orange_3") + r"\1"+colored.attr('reset'),
line)
line = re.sub(r"<(.*?)>",
colored.fg("cyan") + r"\1"+colored.attr('reset'),
line)
line = re.sub(
r"\[(F.*?)\]",
colored.bg("black") + colored.fg("cyan") + r"[\1]" + colored.attr("reset"),
line,
)
line = re.sub(
r"\[(g.*?)\]",
colored.bg("dark_gray")
+ colored.fg("grey_0")
+ r"[\1]"
+ colored.attr("reset"),
line,
)
line = re.sub(
r"\{(.*?)\}", colored.fg("orange_3") + r"\1" + colored.attr("reset"), line
)
line = re.sub(
r"<(.*?)>", colored.fg("cyan") + r"\1" + colored.attr("reset"), line
)
return line
lines = answer.splitlines()
answer_lines = lines[:9]
answer_lines.append(colored.fg('grey_35')+lines[9]+colored.attr('reset'))
answer_lines.append(colored.fg("grey_35") + lines[9] + colored.attr("reset"))
for line in lines[10:]:
answer_lines.append(_colorize_line(line))
answer = "\n".join(answer_lines) + "\n"
+37 -31
View File
@@ -11,6 +11,7 @@ import re
import ansiwrap
import colored
def format_text(text, config=None, highlighter=None):
"""
Renders `text` according to markdown rules.
@@ -19,38 +20,42 @@ def format_text(text, config=None, highlighter=None):
"""
return _format_section(text, config=config, highlighter=highlighter)
def _split_into_paragraphs(text):
return re.split('\n\n+', text)
return re.split("\n\n+", text)
def _colorize(text):
return \
return re.sub(
r"`(.*?)`",
colored.bg("dark_gray")
+ colored.fg("white")
+ " "
+ r"\1"
+ " "
+ colored.attr("reset"),
re.sub(
r"`(.*?)`",
colored.bg("dark_gray") \
+ colored.fg("white") \
+ " " + r"\1" + " " \
+ colored.attr('reset'),
re.sub(
r"\*\*(.*?)\*\*",
colored.attr('bold') \
+ colored.fg("white") \
+ r"\1" \
+ colored.attr('reset'),
text))
r"\*\*(.*?)\*\*",
colored.attr("bold") + colored.fg("white") + r"\1" + colored.attr("reset"),
text,
),
)
def _format_section(section_text, config=None, highlighter=None):
answer = ''
answer = ""
# cut code blocks
block_number = 0
while True:
section_text, replacements = re.subn(
'^```.*?^```',
'MULTILINE_BLOCK_%s' % block_number,
"^```.*?^```",
"MULTILINE_BLOCK_%s" % block_number,
section_text,
1,
flags=re.S | re.MULTILINE)
flags=re.S | re.MULTILINE,
)
block_number += 1
if not replacements:
break
@@ -58,32 +63,33 @@ def _format_section(section_text, config=None, highlighter=None):
# cut links
links = []
while True:
regexp = re.compile(r'\[(.*?)\]\((.*?)\)')
regexp = re.compile(r"\[(.*?)\]\((.*?)\)")
match = regexp.search(section_text)
if match:
links.append(match.group(0))
text = match.group(1)
# links are not yet supported
#
text = '\x1B]8;;%s\x1B\\\\%s\x1B]8;;\x1B\\\\' % (match.group(2), match.group(1))
text = "\x1b]8;;%s\x1b\\\\%s\x1b]8;;\x1b\\\\" % (
match.group(2),
match.group(1),
)
else:
break
section_text, replacements = regexp.subn(
text, # 'LINK_%s' % len(links),
section_text,
1)
text, section_text, 1 # 'LINK_%s' % len(links),
)
block_number += 1
if not replacements:
break
for paragraph in _split_into_paragraphs(section_text):
answer += "\n".join(
ansiwrap.fill(_colorize(line)) + "\n"
for line in paragraph.splitlines()) + "\n"
answer += (
"\n".join(
ansiwrap.fill(_colorize(line)) + "\n" for line in paragraph.splitlines()
)
+ "\n"
)
return {
'ansi': answer,
'links': links
}
return {"ansi": answer, "links": links}
+87 -46
View File
@@ -28,78 +28,107 @@ import re
import colored
from pygments import highlight as pygments_highlight
from pygments.formatters import Terminal256Formatter # pylint: disable=no-name-in-module
# pylint: disable=wrong-import-position
sys.path.append(os.path.abspath(os.path.join(__file__, '..')))
from pygments.formatters import (
Terminal256Formatter,
) # pylint: disable=no-name-in-module
# pylint: disable=wrong-import-position
sys.path.append(os.path.abspath(os.path.join(__file__, "..")))
from config import CONFIG
import languages_data # pylint: enable=wrong-import-position
import languages_data # pylint: enable=wrong-import-position
import fmt.internal
import fmt.comments
def visualize(answer_data, request_options):
"""
Renders `answer_data` as ANSI output.
"""
answers = answer_data['answers']
return _visualize(answers, request_options, search_mode=bool(answer_data['keyword']))
answers = answer_data["answers"]
return _visualize(
answers, request_options, search_mode=bool(answer_data["keyword"])
)
ANSI_ESCAPE = re.compile(r"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]")
ANSI_ESCAPE = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
def remove_ansi(sometext):
"""
Remove ANSI sequences from `sometext` and convert it into plaintext.
"""
return ANSI_ESCAPE.sub('', sometext)
return ANSI_ESCAPE.sub("", sometext)
def _limited_answer(answer):
return colored.bg('dark_goldenrod') + colored.fg('yellow_1') \
+ ' ' + answer + ' ' \
+ colored.attr('reset') + "\n"
return (
colored.bg("dark_goldenrod")
+ colored.fg("yellow_1")
+ " "
+ answer
+ " "
+ colored.attr("reset")
+ "\n"
)
def _colorize_ansi_answer(topic, answer, color_style, # pylint: disable=too-many-arguments
highlight_all=True, highlight_code=False,
unindent_code=False, language=None):
def _colorize_ansi_answer(
topic,
answer,
color_style, # pylint: disable=too-many-arguments
highlight_all=True,
highlight_code=False,
unindent_code=False,
language=None,
):
color_style = color_style or "native"
lexer_class = languages_data.LEXER['bash']
if '/' in topic:
lexer_class = languages_data.LEXER["bash"]
if "/" in topic:
if language is None:
section_name = topic.split('/', 1)[0].lower()
section_name = topic.split("/", 1)[0].lower()
else:
section_name = language
section_name = languages_data.get_lexer_name(section_name)
lexer_class = languages_data.LEXER.get(section_name, lexer_class)
if section_name == 'php':
if section_name == "php":
answer = "<?\n%s?>\n" % answer
if highlight_all:
highlight = lambda answer: pygments_highlight(
answer, lexer_class(), Terminal256Formatter(style=color_style)).strip('\n')+'\n'
highlight = (
lambda answer: pygments_highlight(
answer, lexer_class(), Terminal256Formatter(style=color_style)
).strip("\n")
+ "\n"
)
else:
highlight = lambda x: x
if highlight_code:
blocks = fmt.comments.code_blocks(
answer, wrap_lines=True, unindent_code=(4 if unindent_code else False))
answer, wrap_lines=True, unindent_code=(4 if unindent_code else False)
)
highlighted_blocks = []
for block in blocks:
if block[0] == 1:
this_block = highlight(block[1])
else:
this_block = block[1].strip('\n')+'\n'
this_block = block[1].strip("\n") + "\n"
highlighted_blocks.append(this_block)
result = "\n".join(highlighted_blocks)
else:
result = highlight(answer).lstrip('\n')
result = highlight(answer).lstrip("\n")
return result
def _visualize(answers, request_options, search_mode=False):
highlight = not bool(request_options and request_options.get('no-terminal'))
color_style = (request_options or {}).get('style', '')
if color_style not in CONFIG['frontend.styles']:
color_style = ''
highlight = not bool(request_options and request_options.get("no-terminal"))
color_style = (request_options or {}).get("style", "")
if color_style not in CONFIG["frontend.styles"]:
color_style = ""
# if there is more than one answer,
# show the source of the answer
@@ -108,39 +137,51 @@ def _visualize(answers, request_options, search_mode=False):
found = True
result = ""
for answer_dict in answers:
topic = answer_dict['topic']
topic_type = answer_dict['topic_type']
answer = answer_dict['answer']
found = found and not topic_type == 'unknown'
topic = answer_dict["topic"]
topic_type = answer_dict["topic_type"]
answer = answer_dict["answer"]
found = found and not topic_type == "unknown"
if multiple_answers and topic != 'LIMITED':
if multiple_answers and topic != "LIMITED":
section_name = f"{topic_type}:{topic}"
if not highlight:
result += f"#[{section_name}]\n"
else:
result += "".join([
"\n", colored.bg('dark_gray'), colored.attr("res_underlined"),
f" {section_name} ",
colored.attr("res_underlined"), colored.attr('reset'), "\n"])
result += "".join(
[
"\n",
colored.bg("dark_gray"),
colored.attr("res_underlined"),
f" {section_name} ",
colored.attr("res_underlined"),
colored.attr("reset"),
"\n",
]
)
if answer_dict['format'] in ['ansi', 'text']:
if answer_dict["format"] in ["ansi", "text"]:
result += answer
elif topic == ':firstpage-v1':
elif topic == ":firstpage-v1":
result += fmt.internal.colorize_internal_firstpage_v1(answer)
elif topic == 'LIMITED':
elif topic == "LIMITED":
result += _limited_answer(topic)
else:
result += _colorize_ansi_answer(
topic, answer, color_style,
topic,
answer,
color_style,
highlight_all=highlight,
highlight_code=(topic_type == 'question'
and not request_options.get('add_comments')
and not request_options.get('remove_text')),
language=answer_dict.get("filetype"))
highlight_code=(
topic_type == "question"
and not request_options.get("add_comments")
and not request_options.get("remove_text")
),
language=answer_dict.get("filetype"),
)
if request_options.get('no-terminal'):
if request_options.get("no-terminal"):
result = remove_ansi(result)
result = result.strip('\n') + "\n"
result = result.strip("\n") + "\n"
return result, found
+75 -52
View File
@@ -10,7 +10,7 @@ import os
import re
from subprocess import Popen, PIPE
MYDIR = os.path.abspath(os.path.join(__file__, '..', '..'))
MYDIR = os.path.abspath(os.path.join(__file__, "..", ".."))
sys.path.append("%s/lib/" % MYDIR)
# pylint: disable=wrong-import-position
@@ -22,37 +22,44 @@ import frontend.ansi
# temporary having it here, but actually we have the same data
# in the adapter module
GITHUB_REPOSITORY = {
"late.nz" : 'chubin/late.nz',
"cheat.sheets" : 'chubin/cheat.sheets',
"cheat.sheets dir" : 'chubin/cheat.sheets',
"tldr" : 'tldr-pages/tldr',
"cheat" : 'chrisallenlane/cheat',
"learnxiny" : 'adambard/learnxinyminutes-docs',
"internal" : '',
"search" : '',
"unknown" : '',
"late.nz": "chubin/late.nz",
"cheat.sheets": "chubin/cheat.sheets",
"cheat.sheets dir": "chubin/cheat.sheets",
"tldr": "tldr-pages/tldr",
"cheat": "chrisallenlane/cheat",
"learnxiny": "adambard/learnxinyminutes-docs",
"internal": "",
"search": "",
"unknown": "",
}
def visualize(answer_data, request_options):
query = answer_data['query']
answers = answer_data['answers']
topics_list = answer_data['topics_list']
editable = (len(answers) == 1 and answers[0]['topic_type'] == 'cheat.sheets')
repository_button = ''
def visualize(answer_data, request_options):
query = answer_data["query"]
answers = answer_data["answers"]
topics_list = answer_data["topics_list"]
editable = len(answers) == 1 and answers[0]["topic_type"] == "cheat.sheets"
repository_button = ""
if len(answers) == 1:
repository_button = _github_button(answers[0]['topic_type'])
repository_button = _github_button(answers[0]["topic_type"])
result, found = frontend.ansi.visualize(answer_data, request_options)
return _render_html(query, result, editable, repository_button, topics_list, request_options), found
return (
_render_html(
query, result, editable, repository_button, topics_list, request_options
),
found,
)
def _github_button(topic_type):
full_name = GITHUB_REPOSITORY.get(topic_type, '')
full_name = GITHUB_REPOSITORY.get(topic_type, "")
if not full_name:
return ''
return ""
short_name = full_name.split('/', 1)[1] # pylint: disable=unused-variable
short_name = full_name.split("/", 1)[1] # pylint: disable=unused-variable
button = (
"<!-- Place this tag where you want the button to render. -->"
@@ -66,62 +73,78 @@ def _github_button(topic_type):
) % locals()
return button
def _render_html(query, result, editable, repository_button, topics_list, request_options):
def _render_html(
query, result, editable, repository_button, topics_list, request_options
):
def _html_wrapper(data):
"""
Convert ANSI text `data` to HTML
"""
cmd = ["bash", CONFIG['path.internal.ansi2html'], "--palette=solarized", "--bg=dark"]
cmd = [
"bash",
CONFIG["path.internal.ansi2html"],
"--palette=solarized",
"--bg=dark",
]
try:
proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except FileNotFoundError:
print("ERROR: %s" % cmd)
raise
data = data.encode('utf-8')
data = data.encode("utf-8")
stdout, stderr = proc.communicate(data)
if proc.returncode != 0:
error((stdout + stderr).decode('utf-8'))
return stdout.decode('utf-8')
error((stdout + stderr).decode("utf-8"))
return stdout.decode("utf-8")
result = result + "\n$"
result = _html_wrapper(result)
title = "<title>cheat.sh/%s</title>" % query
submit_button = ('<input type="submit" style="position: absolute;'
' left: -9999px; width: 1px; height: 1px;" tabindex="-1" />')
topic_list = ('<datalist id="topics">%s</datalist>'
% ("\n".join("<option value='%s'></option>" % x for x in topics_list)))
submit_button = (
'<input type="submit" style="position: absolute;'
' left: -9999px; width: 1px; height: 1px;" tabindex="-1" />'
)
topic_list = '<datalist id="topics">%s</datalist>' % (
"\n".join("<option value='%s'></option>" % x for x in topics_list)
)
curl_line = "<span class='pre'>$ curl cheat.sh/</span>"
if query == ':firstpage':
if query == ":firstpage":
query = ""
form_html = ('<form action="/" method="GET">'
'%s%s'
'<input'
' type="text" value="%s" name="topic"'
' list="topics" autofocus autocomplete="off"/>'
'%s'
'</form>') \
% (submit_button, curl_line, query, topic_list)
form_html = (
'<form action="/" method="GET">'
"%s%s"
"<input"
' type="text" value="%s" name="topic"'
' list="topics" autofocus autocomplete="off"/>'
"%s"
"</form>"
) % (submit_button, curl_line, query, topic_list)
edit_button = ''
edit_button = ""
if editable:
# It's possible that topic directory starts with omitted underscore
if '/' in query:
query = '_' + query
edit_page_link = 'https://github.com/chubin/cheat.sheets/edit/master/sheets/' + query
if "/" in query:
query = "_" + query
edit_page_link = (
"https://github.com/chubin/cheat.sheets/edit/master/sheets/" + query
)
edit_button = (
'<pre style="position:absolute;padding-left:40em;overflow:visible;height:0;">'
'[<a href="%s" style="color:cyan">edit</a>]'
'</pre>') % edit_page_link
"</pre>"
) % edit_page_link
result = re.sub("<pre>", edit_button + form_html + "<pre>", result)
result = re.sub("<head>", "<head>" + title, result)
if not request_options.get('quiet'):
result = result.replace('</body>',
TWITTER_BUTTON \
+ GITHUB_BUTTON \
+ repository_button \
+ GITHUB_BUTTON_FOOTER \
+ '</body>')
if not request_options.get("quiet"):
result = result.replace(
"</body>",
TWITTER_BUTTON
+ GITHUB_BUTTON
+ repository_button
+ GITHUB_BUTTON_FOOTER
+ "</body>",
)
return result
+3
View File
@@ -9,6 +9,7 @@ from __future__ import print_function
import sys
import logging
def fatal(text):
"""
Fatal error function.
@@ -18,6 +19,7 @@ def fatal(text):
sys.stderr.write("ERROR: %s\n" % text)
sys.exit(1)
def error(text):
"""
Log error `text` and produce a RuntimeError exception
@@ -27,6 +29,7 @@ def error(text):
logging.error("ERROR %s", text)
raise RuntimeError(text)
def log(text):
"""
Log error `text` (if it does not start with 'Too many queries')
+180 -182
View File
@@ -9,138 +9,135 @@ from the project tree.
import pygments.lexers
LEXER = {
"assembly" : pygments.lexers.NasmLexer,
"awk" : pygments.lexers.AwkLexer,
"bash" : pygments.lexers.BashLexer,
"basic" : pygments.lexers.QBasicLexer,
"bf" : pygments.lexers.BrainfuckLexer,
"chapel" : pygments.lexers.ChapelLexer,
"clojure" : pygments.lexers.ClojureLexer,
"coffee" : pygments.lexers.CoffeeScriptLexer,
"cpp" : pygments.lexers.CppLexer,
"c" : pygments.lexers.CLexer,
"csharp" : pygments.lexers.CSharpLexer,
"d" : pygments.lexers.DLexer,
"dart" : pygments.lexers.DartLexer,
"delphi" : pygments.lexers.DelphiLexer,
"elisp" : pygments.lexers.EmacsLispLexer,
"elixir" : pygments.lexers.ElixirLexer,
"elm" : pygments.lexers.ElmLexer,
"erlang" : pygments.lexers.ErlangLexer,
"factor" : pygments.lexers.FactorLexer,
"forth" : pygments.lexers.ForthLexer,
"fortran" : pygments.lexers.FortranLexer,
"fsharp" : pygments.lexers.FSharpLexer,
"git" : pygments.lexers.BashLexer,
"go" : pygments.lexers.GoLexer,
"groovy" : pygments.lexers.GroovyLexer,
"haskell" : pygments.lexers.HaskellLexer,
"java" : pygments.lexers.JavaLexer,
"js" : pygments.lexers.JavascriptLexer,
"julia" : pygments.lexers.JuliaLexer,
"kotlin" : pygments.lexers.KotlinLexer,
"latex" : pygments.lexers.TexLexer,
"lisp" : pygments.lexers.CommonLispLexer,
"lua" : pygments.lexers.LuaLexer,
"assembly": pygments.lexers.NasmLexer,
"awk": pygments.lexers.AwkLexer,
"bash": pygments.lexers.BashLexer,
"basic": pygments.lexers.QBasicLexer,
"bf": pygments.lexers.BrainfuckLexer,
"chapel": pygments.lexers.ChapelLexer,
"clojure": pygments.lexers.ClojureLexer,
"coffee": pygments.lexers.CoffeeScriptLexer,
"cpp": pygments.lexers.CppLexer,
"c": pygments.lexers.CLexer,
"csharp": pygments.lexers.CSharpLexer,
"d": pygments.lexers.DLexer,
"dart": pygments.lexers.DartLexer,
"delphi": pygments.lexers.DelphiLexer,
"elisp": pygments.lexers.EmacsLispLexer,
"elixir": pygments.lexers.ElixirLexer,
"elm": pygments.lexers.ElmLexer,
"erlang": pygments.lexers.ErlangLexer,
"factor": pygments.lexers.FactorLexer,
"forth": pygments.lexers.ForthLexer,
"fortran": pygments.lexers.FortranLexer,
"fsharp": pygments.lexers.FSharpLexer,
"git": pygments.lexers.BashLexer,
"go": pygments.lexers.GoLexer,
"groovy": pygments.lexers.GroovyLexer,
"haskell": pygments.lexers.HaskellLexer,
"java": pygments.lexers.JavaLexer,
"js": pygments.lexers.JavascriptLexer,
"julia": pygments.lexers.JuliaLexer,
"kotlin": pygments.lexers.KotlinLexer,
"latex": pygments.lexers.TexLexer,
"lisp": pygments.lexers.CommonLispLexer,
"lua": pygments.lexers.LuaLexer,
"mathematica": pygments.lexers.MathematicaLexer,
"matlab" : pygments.lexers.MatlabLexer,
"mongo" : pygments.lexers.JavascriptLexer,
"nim" : pygments.lexers.NimrodLexer,
"matlab": pygments.lexers.MatlabLexer,
"mongo": pygments.lexers.JavascriptLexer,
"nim": pygments.lexers.NimrodLexer,
"objective-c": pygments.lexers.ObjectiveCppLexer,
"ocaml" : pygments.lexers.OcamlLexer,
"octave" : pygments.lexers.OctaveLexer,
"perl" : pygments.lexers.PerlLexer,
"perl6" : pygments.lexers.Perl6Lexer,
"php" : pygments.lexers.PhpLexer,
"psql" : pygments.lexers.PostgresLexer,
"python" : pygments.lexers.PythonLexer,
"python3" : pygments.lexers.Python3Lexer,
"r" : pygments.lexers.SLexer,
"racket" : pygments.lexers.RacketLexer,
"ruby" : pygments.lexers.RubyLexer,
"rust" : pygments.lexers.RustLexer,
"solidity" : pygments.lexers.JavascriptLexer,
"scala" : pygments.lexers.ScalaLexer,
"scheme": pygments.lexers.SchemeLexer,
"psql" : pygments.lexers.SqlLexer,
"sql" : pygments.lexers.SqlLexer,
"swift" : pygments.lexers.SwiftLexer,
"tcl" : pygments.lexers.TclLexer,
"tcsh" : pygments.lexers.TcshLexer,
"vb" : pygments.lexers.VbNetLexer,
"vbnet" : pygments.lexers.VbNetLexer,
"vim" : pygments.lexers.VimLexer,
"ocaml": pygments.lexers.OcamlLexer,
"octave": pygments.lexers.OctaveLexer,
"perl": pygments.lexers.PerlLexer,
"perl6": pygments.lexers.Perl6Lexer,
"php": pygments.lexers.PhpLexer,
"psql": pygments.lexers.PostgresLexer,
"python": pygments.lexers.PythonLexer,
"python3": pygments.lexers.Python3Lexer,
"r": pygments.lexers.SLexer,
"racket": pygments.lexers.RacketLexer,
"ruby": pygments.lexers.RubyLexer,
"rust": pygments.lexers.RustLexer,
"solidity": pygments.lexers.JavascriptLexer,
"scala": pygments.lexers.ScalaLexer,
"scheme": pygments.lexers.SchemeLexer,
"psql": pygments.lexers.SqlLexer,
"sql": pygments.lexers.SqlLexer,
"swift": pygments.lexers.SwiftLexer,
"tcl": pygments.lexers.TclLexer,
"tcsh": pygments.lexers.TcshLexer,
"vb": pygments.lexers.VbNetLexer,
"vbnet": pygments.lexers.VbNetLexer,
"vim": pygments.lexers.VimLexer,
# experimental
"arduino": pygments.lexers.ArduinoLexer,
"pike" : pygments.lexers.PikeLexer,
"eiffel" : pygments.lexers.EiffelLexer,
"clean" : pygments.lexers.CleanLexer,
"dylan" : pygments.lexers.DylanLexer,
# not languages
"cmake" : pygments.lexers.CMakeLexer,
"django" : pygments.lexers.PythonLexer,
"flask" : pygments.lexers.PythonLexer,
"arduino": pygments.lexers.ArduinoLexer,
"pike": pygments.lexers.PikeLexer,
"eiffel": pygments.lexers.EiffelLexer,
"clean": pygments.lexers.CleanLexer,
"dylan": pygments.lexers.DylanLexer,
# not languages
"cmake": pygments.lexers.CMakeLexer,
"django": pygments.lexers.PythonLexer,
"flask": pygments.lexers.PythonLexer,
}
# canonical names are on the right side
LANGUAGE_ALIAS = {
'asm' : 'assembly',
'assembler' : 'assembly',
'c++' : 'cpp',
'c#' : 'csharp',
'clisp' : 'lisp',
'coffeescript': 'coffee',
'cplusplus' : 'cpp',
'dlang' : 'd',
'f#' : 'fsharp',
'golang' : 'go',
'javascript': 'js',
'objc' : 'objective-c',
'p6' : 'perl6',
'sh' : 'bash',
'visualbasic': 'vb',
'vba' : 'vb',
'wolfram' : 'mathematica',
'mma' : 'mathematica',
'wolfram-mathematica': 'mathematica',
'm' : 'octave',
"asm": "assembly",
"assembler": "assembly",
"c++": "cpp",
"c#": "csharp",
"clisp": "lisp",
"coffeescript": "coffee",
"cplusplus": "cpp",
"dlang": "d",
"f#": "fsharp",
"golang": "go",
"javascript": "js",
"objc": "objective-c",
"p6": "perl6",
"sh": "bash",
"visualbasic": "vb",
"vba": "vb",
"wolfram": "mathematica",
"mma": "mathematica",
"wolfram-mathematica": "mathematica",
"m": "octave",
}
VIM_NAME = {
'assembly' : 'asm',
'bash' : 'sh',
'coffeescript': 'coffee',
'csharp' : 'cs',
'delphi' : 'pascal',
'dlang' : 'd',
'elisp' : 'newlisp',
'latex' : 'tex',
'forth' : 'fs',
'nim' : 'nimrod',
'perl6' : 'perl',
'python3' : 'python',
'python-3.x': 'python',
'tcsh' : 'sh',
'solidity' : 'js',
'mathematica': 'mma',
'wolfram-mathematica': 'mma',
'psql' : 'sql',
"assembly": "asm",
"bash": "sh",
"coffeescript": "coffee",
"csharp": "cs",
"delphi": "pascal",
"dlang": "d",
"elisp": "newlisp",
"latex": "tex",
"forth": "fs",
"nim": "nimrod",
"perl6": "perl",
"python3": "python",
"python-3.x": "python",
"tcsh": "sh",
"solidity": "js",
"mathematica": "mma",
"wolfram-mathematica": "mma",
"psql": "sql",
# not languages
'cmake' : 'sh',
'git' : 'sh',
'django' : 'python',
'flask' : 'python',
"cmake": "sh",
"git": "sh",
"django": "python",
"flask": "python",
}
SO_NAME = {
'coffee' : 'coffeescript',
'js' : 'javascript',
'python3' : 'python-3.x',
'vb' : 'vba',
'mathematica': 'wolfram-mathematica',
"coffee": "coffeescript",
"js": "javascript",
"python3": "python-3.x",
"vb": "vba",
"mathematica": "wolfram-mathematica",
}
@@ -149,85 +146,83 @@ SO_NAME = {
# into canonical cheat.sh names
#
ATOM_FT_NAME = {
}
ATOM_FT_NAME = {}
EMACS_FT_NAME = {
"asm-mode" : "asm",
"awk-mode" : "awk",
"sh-mode" : "bash",
"asm-mode": "asm",
"awk-mode": "awk",
"sh-mode": "bash",
# basic
"brainfuck-mode" : "bf",
"brainfuck-mode": "bf",
# chapel
"clojure-mode" : "clojure",
"coffee-mode" : "coffee",
"c++-mode" : "cpp",
"c-mode" : "c",
"csharp-mode" : "csharp",
"d-mode" : "d",
"dart-mode" : "dart",
"dylan-mode" : "dylan",
"delphi-mode" : "delphi",
"emacs-lisp-mode" : "elisp",
"clojure-mode": "clojure",
"coffee-mode": "coffee",
"c++-mode": "cpp",
"c-mode": "c",
"csharp-mode": "csharp",
"d-mode": "d",
"dart-mode": "dart",
"dylan-mode": "dylan",
"delphi-mode": "delphi",
"emacs-lisp-mode": "elisp",
# elixir
"elm-mode" : "elm",
"erlang-mode" : "erlang",
"elm-mode": "elm",
"erlang-mode": "erlang",
# factor
"forth-mode" : "forth",
"fortran-mode" : "fortran",
"fsharp-mode" : "fsharp",
"go-mode" : "go",
"groovy-mode" : "groovy",
"haskell-mode" : "haskell",
"forth-mode": "forth",
"fortran-mode": "fortran",
"fsharp-mode": "fsharp",
"go-mode": "go",
"groovy-mode": "groovy",
"haskell-mode": "haskell",
# "hy-mode"
"java-mode" : "java",
"js-jsx-mode" : "js",
"js-mode" : "js",
"js2-jsx-mode" : "js",
"js2-mode" : "js",
"julia-mode" : "julia",
"kotlin-mode" : "kotlin",
"java-mode": "java",
"js-jsx-mode": "js",
"js-mode": "js",
"js2-jsx-mode": "js",
"js2-mode": "js",
"julia-mode": "julia",
"kotlin-mode": "kotlin",
"lisp-interaction-mode": "lisp",
"lisp-mode" : "lisp",
"lua-mode" : "lua",
"lisp-mode": "lisp",
"lua-mode": "lua",
# mathematica
"matlab-mode" : "matlab",
"matlab-mode": "matlab",
# mongo
"objc-mode" : "objective-c",
"objc-mode": "objective-c",
# ocaml
"perl-mode" : "perl",
"perl6-mode" : "perl6",
"php-mode" : "php",
"perl-mode": "perl",
"perl6-mode": "perl6",
"php-mode": "php",
# psql
"python-mode" : "python",
"python-mode": "python",
# python3
# r -- ess looks it, but I don't know the mode name off hand
"racket-mode" : "racket",
"ruby-mode" : "ruby",
"rust-mode" : "rust",
"solidity-mode" : "solidity",
"scala-mode" : "scala",
"scheme-mode" : "scheme",
"sql-mode" : "sql",
"swift-mode" : "swift",
"tcl-mode" : "tcl",
"racket-mode": "racket",
"ruby-mode": "ruby",
"rust-mode": "rust",
"solidity-mode": "solidity",
"scala-mode": "scala",
"scheme-mode": "scheme",
"sql-mode": "sql",
"swift-mode": "swift",
"tcl-mode": "tcl",
# tcsh
"visual-basic-mode" : "vb",
"visual-basic-mode": "vb",
# vbnet
# vim
}
SUBLIME_FT_NAME = {
}
SUBLIME_FT_NAME = {}
VIM_FT_NAME = {
'asm': 'assembler',
'javascript': 'js',
'octave': 'matlab',
"asm": "assembler",
"javascript": "js",
"octave": "matlab",
}
VSCODE_FT_NAME = {
}
VSCODE_FT_NAME = {}
def rewrite_editor_section_name(section_name):
"""
@@ -248,29 +243,32 @@ def rewrite_editor_section_name(section_name):
>>> rewrite_editor_section_name('vscode:js')
'js'
"""
if ':' not in section_name:
if ":" not in section_name:
return section_name
editor_name, section_name = section_name.split(':', 1)
editor_name, section_name = section_name.split(":", 1)
editor_name_mapping = {
'atom': ATOM_FT_NAME,
'emacs': EMACS_FT_NAME,
'sublime': SUBLIME_FT_NAME,
'vim': VIM_FT_NAME,
'vscode': VSCODE_FT_NAME,
"atom": ATOM_FT_NAME,
"emacs": EMACS_FT_NAME,
"sublime": SUBLIME_FT_NAME,
"vim": VIM_FT_NAME,
"vscode": VSCODE_FT_NAME,
}
if editor_name not in editor_name_mapping:
return section_name
return editor_name_mapping[editor_name].get(section_name, section_name)
def get_lexer_name(section_name):
"""
Rewrite `section_name` for the further lexer search (for syntax highlighting)
"""
if ':' in section_name:
if ":" in section_name:
section_name = rewrite_editor_section_name(section_name)
return LANGUAGE_ALIAS.get(section_name, section_name)
if __name__ == "__main__":
import doctest
doctest.testmod()
+20 -14
View File
@@ -19,14 +19,16 @@ Usage:
import time
from globals import log
_WHITELIST = ['5.9.243.177']
_WHITELIST = ["5.9.243.177"]
def _time_caps(minutes, hours, days):
return {
'min': minutes,
'hour': hours,
'day': days,
}
"min": minutes,
"hour": hours,
"day": days,
}
class Limits(object):
"""
@@ -38,17 +40,17 @@ class Limits(object):
"""
def __init__(self):
self.intervals = ['min', 'hour', 'day']
self.intervals = ["min", "hour", "day"]
self.divisor = _time_caps(60, 3600, 86400)
self.limit = _time_caps(30, 600, 1000)
self.last_update = _time_caps(0, 0, 0)
self.counter = {
'min': {},
'hour': {},
'day': {},
}
"min": {},
"hour": {},
"day": {},
}
self._clear_counters_if_needed()
@@ -60,13 +62,15 @@ class Limits(object):
def _limit_exceeded(self, interval, ip_address):
visits = self.counter[interval][ip_address]
limit = self._get_limit(interval)
return visits > limit
return visits > limit
def _get_limit(self, interval):
return self.limit[interval]
def _report_excessive_visits(self, interval, ip_address):
log("%s LIMITED [%s for %s]" % (ip_address, self._get_limit(interval), interval))
log(
"%s LIMITED [%s for %s]" % (ip_address, self._get_limit(interval), interval)
)
def check_ip(self, ip_address):
"""
@@ -80,8 +84,10 @@ class Limits(object):
self._log_visit(interval, ip_address)
if self._limit_exceeded(interval, ip_address):
self._report_excessive_visits(interval, ip_address)
return ("Not so fast! Number of queries per %s is limited to %s"
% (interval, self._get_limit(interval)))
return "Not so fast! Number of queries per %s is limited to %s" % (
interval,
self._get_limit(interval),
)
return None
def reset(self):
+7 -6
View File
@@ -2,24 +2,25 @@
Parse query arguments.
"""
def parse_args(args):
"""
Parse arguments and options.
Replace short options with their long counterparts.
"""
result = {
'add_comments': True,
"add_comments": True,
}
query = ""
newargs = {}
for key, val in args.items():
if val == "" or val == [] or val == ['']:
if val == "" or val == [] or val == [""]:
query += key
continue
if val == 'True':
if val == "True":
val = True
if val == 'False':
if val == "False":
val = False
newargs[key] = val
@@ -27,8 +28,8 @@ def parse_args(args):
"c": dict(add_comments=False, unindent_code=False),
"C": dict(add_comments=False, unindent_code=True),
"Q": dict(remove_text=True),
'q': dict(quiet=True),
'T': {'no-terminal': True},
"q": dict(quiet=True),
"T": {"no-terminal": True},
}
for option, meaning in options_meaning.items():
if option in query:
+17 -17
View File
@@ -1,25 +1,25 @@
import os
import json
COLORS_JSON = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'colors.json')
COLOR_TABLE = json.loads(open(COLORS_JSON, 'r').read())
VALID_COLORS = [x['hexString'] for x in COLOR_TABLE]
HEX_TO_ANSI = {x['hexString']:x['colorId'] for x in COLOR_TABLE}
COLORS_JSON = os.path.join(os.path.dirname(os.path.abspath(__file__)), "colors.json")
COLOR_TABLE = json.loads(open(COLORS_JSON, "r").read())
VALID_COLORS = [x["hexString"] for x in COLOR_TABLE]
HEX_TO_ANSI = {x["hexString"]: x["colorId"] for x in COLOR_TABLE}
def rgb_from_str(s):
# s starts with a #.
r, g, b = int(s[1:3],16), int(s[3:5], 16),int(s[5:7], 16)
return r, g, b
# s starts with a #.
r, g, b = int(s[1:3], 16), int(s[3:5], 16), int(s[5:7], 16)
return r, g, b
def find_nearest_color(hex_color):
def find_nearest_color(hex_color):
R, G, B = rgb_from_str(hex_color)
mindiff = None
for d in VALID_COLORS:
r, g, b = rgb_from_str(d)
diff = abs(R -r)*256 + abs(G-g)* 256 + abs(B- b)* 256
if mindiff is None or diff < mindiff:
mindiff = diff
mincolorname = d
return mincolorname
for d in VALID_COLORS:
r, g, b = rgb_from_str(d)
diff = abs(R - r) * 256 + abs(G - g) * 256 + abs(B - b) * 256
if mindiff is None or diff < mindiff:
mindiff = diff
mincolorname = d
return mincolorname
+139 -87
View File
@@ -25,27 +25,29 @@ import pyte
# http://stackoverflow.com/questions/19782975/convert-rgb-color-to-the-nearest-color-in-palette-web-safe-color
try:
basestring # Python 2
basestring # Python 2
except NameError:
basestring = str # Python 3
def color_mapping(clr):
if clr == 'default':
if clr == "default":
return None
return clr
class Point(object):
"""
One point (character) on a terminal
"""
def __init__(self, char=None, foreground=None, background=None):
self.foreground = foreground
self.background = background
self.char = char
class Panela:
class Panela:
"""
To implement:
@@ -110,9 +112,9 @@ class Panela:
return False
return True
#
# Blocks manipulation
#
#
# Blocks manipulation
#
def copy(self, x1, y1, x2, y2):
@@ -130,14 +132,13 @@ class Panela:
if y1 > y2:
y1, y2 = y2, y1
field = [self.field[i] for i in range(y1, y2+1)]
field = [line[x1:x2+1] for line in field]
field = [self.field[i] for i in range(y1, y2 + 1)]
field = [line[x1 : x2 + 1] for line in field]
return Panela(field=field)
def cut(self, x1, y1, x2, y2):
"""
"""
""" """
if x1 < 0:
x1 += self.size_x
if x2 < 0:
@@ -154,8 +155,8 @@ class Panela:
copied = self.copy(x1, y1, x2, y2)
for y in range(y1, y2+1):
for x in range(x1, x2+1):
for y in range(y1, y2 + 1):
for x in range(x1, x2 + 1):
self.field[y][x] = Point()
return copied
@@ -170,7 +171,9 @@ class Panela:
self.size_x += cols
if rows and rows > 0:
self.field = self.field + [[Point() for _ in range(self.size_x)] for _ in range(rows)]
self.field = self.field + [
[Point() for _ in range(self.size_x)] for _ in range(rows)
]
self.size_y += rows
def crop(self, left=None, right=None, top=None, bottom=None):
@@ -224,17 +227,24 @@ class Panela:
y_extend = y1 + panela.size_y - self.size_y
self.extend(cols=x_extend, rows=y_extend)
for i in range(y1, min(self.size_y, y1+panela.size_y)):
for j in range(x1, min(self.size_x, x1+panela.size_x)):
for i in range(y1, min(self.size_y, y1 + panela.size_y)):
for j in range(x1, min(self.size_x, x1 + panela.size_x)):
if transparence:
if panela.field[i-y1][j-x1].char and panela.field[i-y1][j-x1].char != " ":
if panela.field[i-y1][j-x1].foreground:
self.field[i][j].foreground = panela.field[i-y1][j-x1].foreground
if panela.field[i-y1][j-x1].background:
self.field[i][j].background = panela.field[i-y1][j-x1].background
self.field[i][j].char = panela.field[i-y1][j-x1].char
if (
panela.field[i - y1][j - x1].char
and panela.field[i - y1][j - x1].char != " "
):
if panela.field[i - y1][j - x1].foreground:
self.field[i][j].foreground = panela.field[i - y1][
j - x1
].foreground
if panela.field[i - y1][j - x1].background:
self.field[i][j].background = panela.field[i - y1][
j - x1
].background
self.field[i][j].char = panela.field[i - y1][j - x1].char
else:
self.field[i][j] = panela.field[i-y1][j-x1]
self.field[i][j] = panela.field[i - y1][j - x1]
def strip(self):
"""
@@ -269,14 +279,14 @@ class Panela:
top += 1
bottom = 0
while bottom < self.size_y and empty_line(self.field[-(bottom+1)]):
while bottom < self.size_y and empty_line(self.field[-(bottom + 1)]):
bottom += 1
self.crop(left=left, right=right, top=top, bottom=bottom)
#
# Drawing and painting
#
#
# Drawing and painting
#
def put_point(self, col, row, char=None, color=None, background=None):
"""
@@ -294,7 +304,9 @@ class Panela:
if color:
self.field[row][col].foreground = color
else:
self.field[row][col] = Point(char=char, foreground=color, background=background)
self.field[row][col] = Point(
char=char, foreground=color, background=background
)
def put_string(self, col, row, s=None, color=None, background=None):
"""
@@ -302,7 +314,7 @@ class Panela:
ad <col>, <row>
"""
for i, c in enumerate(s):
self.put_point(col+i, row, c, color=color, background=background)
self.put_point(col + i, row, c, color=color, background=background)
def put_line(self, x1, y1, x2, y2, char=None, color=None, background=None):
"""
@@ -384,14 +396,16 @@ class Panela:
else:
char_iter = itertools.repeat(char)
for x, y in get_line((x1,y1), (x2, y2)):
for x, y in get_line((x1, y1), (x2, y2)):
char = next(char_iter)
color = next(color_iter)
background = next(background_iter)
self.put_point(x, y, char=char, color=color, background=background)
def paint(self, x1, y1, x2, y2, c1, c2=None, bg1=None, bg2=None, angle=None, angle_bg=None):
def paint(
self, x1, y1, x2, y2, c1, c2=None, bg1=None, bg2=None, angle=None, angle_bg=None
):
"""
Paint rectangle (x1,y1) (x2,y2) with foreground color c1 and background bg1 if specified.
If spefied colors c2/bg2, rectangle is painted with linear gradient (inclined under angle).
@@ -405,9 +419,13 @@ class Panela:
r1, g1, b1 = rgb_from_str(c1)
r2, g2, b2 = rgb_from_str(c2)
k = 1.0*(j-x1)/(x2-x1)*(1-a)
l = 1.0*(i-y1)/(y2-y1)*a
r3, g3, b3 = int(r1 + 1.0*(r2-r1)*(k+l)), int(g1 + 1.0*(g2-g1)*(k+l)), int(b1 + 1.0*(b2-b1)*(k+l))
k = 1.0 * (j - x1) / (x2 - x1) * (1 - a)
l = 1.0 * (i - y1) / (y2 - y1) * a
r3, g3, b3 = (
int(r1 + 1.0 * (r2 - r1) * (k + l)),
int(g1 + 1.0 * (g2 - g1) * (k + l)),
int(b1 + 1.0 * (b2 - b1) * (k + l)),
)
return "#%02x%02x%02x" % (r3, g3, b3)
@@ -419,14 +437,18 @@ class Panela:
r1, g1, b1 = rgb_from_str(bg1)
r2, g2, b2 = rgb_from_str(bg2)
k = 1.0*(j-x1)/(x2-x1)*(1-a)
l = 1.0*(i-y1)/(y2-y1)*a
r3, g3, b3 = int(r1 + 1.0*(r2-r1)*(k+l)), int(g1 + 1.0*(g2-g1)*(k+l)), int(b1 + 1.0*(b2-b1)*(k+l))
k = 1.0 * (j - x1) / (x2 - x1) * (1 - a)
l = 1.0 * (i - y1) / (y2 - y1) * a
r3, g3, b3 = (
int(r1 + 1.0 * (r2 - r1) * (k + l)),
int(g1 + 1.0 * (g2 - g1) * (k + l)),
int(b1 + 1.0 * (b2 - b1) * (k + l)),
)
return "#%02x%02x%02x" % (r3, g3, b3)
if c2 == None:
for i in range(y1,y2):
for i in range(y1, y2):
for j in range(x1, x2):
self.field[i][j].foreground = c1
if bg1:
@@ -435,7 +457,7 @@ class Panela:
else:
self.field[i][j].background = bg1
else:
for i in range(y1,y2):
for i in range(y1, y2):
for j in range(x1, x2):
self.field[i][j].foreground = calculate_color(i, j)
if bg1:
@@ -446,20 +468,22 @@ class Panela:
return self
def put_rectangle(self, x1, y1, x2, y2, char=None, frame=None, color=None, background=None):
def put_rectangle(
self, x1, y1, x2, y2, char=None, frame=None, color=None, background=None
):
"""
Draw rectangle (x1,y1), (x2,y2) using <char> character, <color> and <background> color
"""
frame_chars = {
'ascii': u'++++-|',
'single': u'┌┐└┘─│',
'double': u'┌┐└┘─│',
"ascii": "++++-|",
"single": "┌┐└┘─│",
"double": "┌┐└┘─│",
}
if frame in frame_chars:
chars = frame_chars[frame]
else:
chars = char*6
chars = char * 6
for x in range(x1, x2):
self.put_point(x, y1, char=chars[4], color=color, background=background)
@@ -474,7 +498,6 @@ class Panela:
self.put_point(x1, y2, char=chars[2], color=color, background=background)
self.put_point(x2, y2, char=chars[3], color=color, background=background)
def put_circle(self, x0, y0, radius, char=None, color=None, background=None):
"""
Draw cricle with center in (x, y) and radius r (x1,y1), (x2,y2)
@@ -482,7 +505,7 @@ class Panela:
"""
def k(x):
return int(x*1.9)
return int(x * 1.9)
f = 1 - radius
ddf_x = 1
@@ -491,44 +514,66 @@ class Panela:
y = radius
self.put_point(x0, y0 + radius, char=char, color=color, background=background)
self.put_point(x0, y0 - radius, char=char, color=color, background=background)
self.put_point(x0 + k(radius), y0, char=char, color=color, background=background)
self.put_point(x0 - k(radius), y0, char=char, color=color, background=background)
self.put_point(
x0 + k(radius), y0, char=char, color=color, background=background
)
self.put_point(
x0 - k(radius), y0, char=char, color=color, background=background
)
char = "x"
while x < y:
if f >= 0:
if f >= 0:
y -= 1
ddf_y += 2
f += ddf_y
x += 1
ddf_x += 2
f += ddf_x
self.put_point(x0 + k(x), y0 + y, char=char, color=color, background=background)
self.put_point(x0 - k(x), y0 + y, char=char, color=color, background=background)
self.put_point(x0 + k(x), y0 - y, char=char, color=color, background=background)
self.put_point(x0 - k(x), y0 - y, char=char, color=color, background=background)
self.put_point(x0 + k(y), y0 + x, char=char, color=color, background=background)
self.put_point(x0 - k(y), y0 + x, char=char, color=color, background=background)
self.put_point(x0 + k(y), y0 - x, char=char, color=color, background=background)
self.put_point(x0 - k(y), y0 - x, char=char, color=color, background=background)
f += ddf_x
self.put_point(
x0 + k(x), y0 + y, char=char, color=color, background=background
)
self.put_point(
x0 - k(x), y0 + y, char=char, color=color, background=background
)
self.put_point(
x0 + k(x), y0 - y, char=char, color=color, background=background
)
self.put_point(
x0 - k(x), y0 - y, char=char, color=color, background=background
)
self.put_point(
x0 + k(y), y0 + x, char=char, color=color, background=background
)
self.put_point(
x0 - k(y), y0 + x, char=char, color=color, background=background
)
self.put_point(
x0 + k(y), y0 - x, char=char, color=color, background=background
)
self.put_point(
x0 - k(y), y0 - x, char=char, color=color, background=background
)
def read_ansi(self, seq, x=0, y=0, transparence=True):
"""
Read ANSI sequence and render it to the panela starting from x and y.
If transparence is True, replace spaces with ""
"""
screen = pyte.screens.Screen(self.size_x, self.size_y+1)
screen = pyte.screens.Screen(self.size_x, self.size_y + 1)
stream = pyte.streams.ByteStream()
stream.attach(screen)
stream.feed(seq.replace('\n', '\r\n'))
stream.feed(seq.replace("\n", "\r\n"))
for i, line in sorted(screen.buffer.items(), key=lambda x: x[0]):
for j, char in sorted(line.items(), key=lambda x: x[0]):
if j >= self.size_x:
break
self.field[i][j] = Point(char.data, color_mapping(char.fg), color_mapping(char.bg))
self.field[i][j] = Point(
char.data, color_mapping(char.fg), color_mapping(char.bg)
)
def __str__(self):
answer = ""
@@ -540,72 +585,78 @@ class Panela:
stop = ""
if self.field[i][j].foreground:
fg_ansi = '\033[38;2;%s;%s;%sm' % rgb_from_str(self.field[i][j].foreground)
fg_ansi = "\033[38;2;%s;%s;%sm" % rgb_from_str(
self.field[i][j].foreground
)
stop = colored.attr("reset")
if self.field[i][j].background:
bg_ansi = '\033[48;2;%s;%s;%sm' % rgb_from_str(self.field[i][j].background)
bg_ansi = "\033[48;2;%s;%s;%sm" % rgb_from_str(
self.field[i][j].background
)
stop = colored.attr("reset")
char = c.char or " "
if not skip_next:
answer += fg_ansi + bg_ansi + char.encode('utf-8') + stop
answer += fg_ansi + bg_ansi + char.encode("utf-8") + stop
skip_next = wcswidth(char) == 2
# answer += "...\n"
answer += "\n"
return answer
########################################################################################################
class Template(object):
def __init__(self):
self._mode = 'page'
self._mode = "page"
self.page = []
self.mask = []
self.code = []
self.panela = None
self._colors = {
'A': '#00cc00',
'B': '#00cc00',
'C': '#00aacc',
'D': '#888888',
'E': '#cccc00',
'F': '#ff0000',
'H': '#22aa22',
'I': '#cc0000',
'J': '#000000',
"A": "#00cc00",
"B": "#00cc00",
"C": "#00aacc",
"D": "#888888",
"E": "#cccc00",
"F": "#ff0000",
"H": "#22aa22",
"I": "#cc0000",
"J": "#000000",
}
self._bg_colors = {
'G': '#555555',
'J': '#555555',
"G": "#555555",
"J": "#555555",
}
def _process_line(self, line):
if line == 'mask':
self._mode = 'mask'
if line == '':
self._mode = 'code'
if line == "mask":
self._mode = "mask"
if line == "":
self._mode = "code"
def read(self, filename):
"""
Read template from `filename`
"""
with open(filename) as f:
self._mode = 'page'
self._mode = "page"
for line in f.readlines():
line = line.rstrip('\n')
if line.startswith('==[') and line.endswith(']=='):
line = line.rstrip("\n")
if line.startswith("==[") and line.endswith("]=="):
self._process_line(line[3:-3].strip())
continue
if self._mode == 'page':
if self._mode == "page":
self.page.append(line)
elif self._mode == 'mask':
elif self._mode == "mask":
self.mask.append(line)
elif self._mode == 'code':
elif self._mode == "code":
self.mask.append(line)
def apply_mask(self):
@@ -631,6 +682,7 @@ class Template(object):
return self.page
def main():
"Only for experiments"
@@ -641,5 +693,5 @@ def main():
sys.stdout.write(template.show())
if __name__ == '__main__':
if __name__ == "__main__":
main()
+9 -5
View File
@@ -12,31 +12,35 @@ import os
import random
from config import CONFIG
def _save_cheatsheet(topic_name, cheatsheet):
"""
Save posted cheat sheet `cheatsheet` with `topic_name`
in the spool directory
"""
nonce = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(9))
filename = topic_name.replace('/', '.') + "." + nonce
nonce = "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(9)
)
filename = topic_name.replace("/", ".") + "." + nonce
filename = os.path.join(CONFIG["path.spool"], filename)
open(filename, 'w').write(cheatsheet)
open(filename, "w").write(cheatsheet)
def process_post_request(req, topic):
"""
Process POST request `req`.
"""
for key, val in req.form.items():
if key == '':
if key == "":
if topic is None:
topic_name = "UNNAMED"
else:
topic_name = topic
cheatsheet = val
else:
if val == '':
if val == "":
if topic is None:
topic_name = "UNNAMED"
else:
+25 -15
View File
@@ -1,36 +1,43 @@
import search
import fmt.comments
def postprocess(answer, keyword, options, request_options=None):
answer = _answer_add_comments(answer, request_options=request_options)
answer = _answer_filter_by_keyword(answer, keyword, options, request_options=request_options)
answer = _answer_filter_by_keyword(
answer, keyword, options, request_options=request_options
)
return answer
def _answer_add_comments(answer, request_options=None):
if answer['format'] != 'text+code':
if answer["format"] != "text+code":
return answer
topic = answer['topic']
topic = answer["topic"]
if "filetype" in answer:
filetype = answer["filetype"]
else:
filetype = 'bash'
if '/' in topic:
filetype = topic.split('/', 1)[0]
if filetype.startswith('q:'):
filetype = "bash"
if "/" in topic:
filetype = topic.split("/", 1)[0]
if filetype.startswith("q:"):
filetype = filetype[2:]
answer['answer'] = fmt.comments.beautify(
answer['answer'], filetype, request_options)
answer['format'] = 'code'
answer['filetype'] = filetype
answer["answer"] = fmt.comments.beautify(
answer["answer"], filetype, request_options
)
answer["format"] = "code"
answer["filetype"] = filetype
return answer
def _answer_filter_by_keyword(answer, keyword, options, request_options=None):
answer['answer'] = _filter_by_keyword(answer['answer'], keyword, options)
answer["answer"] = _filter_by_keyword(answer["answer"], keyword, options)
return answer
def _filter_by_keyword(answer, keyword, options):
def _join_paragraphs(paragraphs):
@@ -47,12 +54,15 @@ def _filter_by_keyword(answer, keyword, options):
answer.append(paragraph)
paragraph = ""
else:
paragraph += line+"\n"
paragraph += line + "\n"
answer.append(paragraph)
return answer
paragraphs = [p for p in _split_paragraphs(answer)
if search.match(p, keyword, options=options)]
paragraphs = [
p
for p in _split_paragraphs(answer)
if search.match(p, keyword, options=options)
]
if not paragraphs:
return ""
+57 -46
View File
@@ -21,8 +21,8 @@ import adapter.question
import adapter.rosetta
from config import CONFIG
class Router(object):
class Router(object):
"""
Implementation of query routing. Routing is based on `routing_table`
and the data exported by the adapters (functions `get_list()` and `is_found()`).
@@ -38,28 +38,27 @@ class Router(object):
adapter_class = adapter.all_adapters(as_dict=True)
active_adapters = set(CONFIG['adapters.active'] + CONFIG['adapters.mandatory'])
active_adapters = set(CONFIG["adapters.active"] + CONFIG["adapters.mandatory"])
self._adapter = {
"internal": adapter.internal.InternalPages(
get_topic_type=self.get_topic_type,
get_topics_list=self.get_topics_list),
get_topic_type=self.get_topic_type, get_topics_list=self.get_topics_list
),
"unknown": adapter.internal.UnknownPages(
get_topic_type=self.get_topic_type,
get_topics_list=self.get_topics_list),
get_topic_type=self.get_topic_type, get_topics_list=self.get_topics_list
),
}
for by_name in active_adapters:
if by_name not in self._adapter:
self._adapter[by_name] = adapter_class[by_name]()
self._topic_list = {
key: obj.get_list()
for key, obj in self._adapter.items()
}
self._topic_list = {key: obj.get_list() for key, obj in self._adapter.items()}
self.routing_table = CONFIG["routing.main"]
self.routing_table = CONFIG["routing.pre"] + self.routing_table + CONFIG["routing.post"]
self.routing_table = (
CONFIG["routing.pre"] + self.routing_table + CONFIG["routing.post"]
)
def get_topics_list(self, skip_dirs=False, skip_internal=False):
"""
@@ -69,7 +68,7 @@ class Router(object):
if self._cached_topics_list:
return self._cached_topics_list
skip = ['fosdem']
skip = ["fosdem"]
if skip_dirs:
skip.append("cheat.sheets dir")
if skip_internal:
@@ -78,7 +77,7 @@ class Router(object):
answer = {}
for key in sources_to_merge:
answer.update({name:key for name in self._topic_list[key]})
answer.update({name: key for name in self._topic_list[key]})
answer = sorted(set(answer.keys()))
self._cached_topics_list = answer
@@ -115,8 +114,9 @@ class Router(object):
"""
Return answer_dict for the `query`.
"""
return self._adapter[topic_type]\
.get_page_dict(query, request_options=request_options)
return self._adapter[topic_type].get_page_dict(
query, request_options=request_options
)
def handle_if_random_request(self, topic):
"""
@@ -127,30 +127,32 @@ class Router(object):
"""
def __select_random_topic(prefix, topic_list):
#Here we remove the special cases
cleaned_topic_list = [ x for x in topic_list if '/' not in x and ':' not in x]
# Here we remove the special cases
cleaned_topic_list = [
x for x in topic_list if "/" not in x and ":" not in x
]
#Here we still check that cleaned_topic_list in not empty
# Here we still check that cleaned_topic_list in not empty
if not cleaned_topic_list:
return prefix
random_topic = random.choice(cleaned_topic_list)
return prefix + random_topic
if topic.endswith('/:random') or topic.lstrip('/') == ':random':
#We strip the :random part and see if the query is valid by running a get_topics_list()
if topic.lstrip('/') == ':random' :
topic = topic.lstrip('/')
if topic.endswith("/:random") or topic.lstrip("/") == ":random":
# We strip the :random part and see if the query is valid by running a get_topics_list()
if topic.lstrip("/") == ":random":
topic = topic.lstrip("/")
prefix = topic[:-7]
topic_list = [x[len(prefix):]
for x in self.get_topics_list()
if x.startswith(prefix)]
if '' in topic_list:
topic_list.remove('')
topic_list = [
x[len(prefix) :] for x in self.get_topics_list() if x.startswith(prefix)
]
if topic_list:
if "" in topic_list:
topic_list.remove("")
if topic_list:
# This is a correct formatted random query like /cpp/:random as the topic_list is not empty.
random_topic = __select_random_topic(prefix, topic_list)
return random_topic
@@ -160,10 +162,12 @@ class Router(object):
wrongly_formatted_random = topic[:-8]
return wrongly_formatted_random
#Here if not a random requst, we just forward the topic
# Here if not a random requst, we just forward the topic
return topic
def get_answers(self, topic: str, request_options:Dict[str, str] = None) -> List[Dict[str, Any]]:
def get_answers(
self, topic: str, request_options: Dict[str, str] = None
) -> List[Dict[str, Any]]:
"""
Find cheat sheets for the topic.
@@ -173,7 +177,7 @@ class Router(object):
Returns:
[answer_dict]: list of answers (dictionaries)
"""
# if topic specified as <topic_type>:<topic>,
# cut <topic_type> off
topic_type = ""
@@ -191,21 +195,25 @@ class Router(object):
# 'question' queries are pretty expensive, that's why they should be handled
# in a special way:
# we do not drop the old style cache entries and try to reuse them if possible
if topic_types == ['question']:
answer = cache.get('q:' + topic)
if topic_types == ["question"]:
answer = cache.get("q:" + topic)
if answer:
if isinstance(answer, dict):
return [answer]
return [{
'topic': topic,
'topic_type': 'question',
'answer': answer,
'format': 'text+code',
}]
return [
{
"topic": topic,
"topic_type": "question",
"answer": answer,
"format": "text+code",
}
]
answer = self._get_page_dict(topic, topic_types[0], request_options=request_options)
answer = self._get_page_dict(
topic, topic_types[0], request_options=request_options
)
if answer.get("cache", True):
cache.put('q:' + topic, answer)
cache.put("q:" + topic, answer)
return [answer]
# Try to find cacheable queries in the cache.
@@ -224,7 +232,9 @@ class Router(object):
answers.append(answer)
continue
answer = self._get_page_dict(topic, topic_type, request_options=request_options)
answer = self._get_page_dict(
topic, topic_type, request_options=request_options
)
if isinstance(answer, dict):
if "cache" in answer:
cache_needed = answer["cache"]
@@ -236,6 +246,7 @@ class Router(object):
return answers
# pylint: disable=invalid-name
_ROUTER = Router()
get_topics_list = _ROUTER.get_topics_list
+18 -17
View File
@@ -24,28 +24,30 @@ import re
from config import CONFIG
from routing import get_answers, get_topics_list
def _limited_entry():
return {
'topic_type': 'LIMITED',
"topic_type": "LIMITED",
"topic": "LIMITED",
'answer': "LIMITED TO %s ANSWERS" % CONFIG['search.limit'],
'format': "code",
"answer": "LIMITED TO %s ANSWERS" % CONFIG["search.limit"],
"format": "code",
}
def _parse_options(options):
"""Parse search options string into optiond_dict
"""
"""Parse search options string into optiond_dict"""
if options is None:
return {}
search_options = {
'insensitive': 'i' in options,
'word_boundaries': 'b' in options,
'recursive': 'r' in options,
"insensitive": "i" in options,
"word_boundaries": "b" in options,
"recursive": "r" in options,
}
return search_options
def match(paragraph, keyword, options=None, options_dict=None):
"""Search for each keyword from `keywords` in `page`
and if all of them are found, return `True`.
@@ -58,8 +60,8 @@ def match(paragraph, keyword, options=None, options_dict=None):
if keyword is None:
return True
if '~' in keyword:
keywords = keyword.split('~')
if "~" in keyword:
keywords = keyword.split("~")
else:
keywords = [keyword]
@@ -82,6 +84,7 @@ def match(paragraph, keyword, options=None, options_dict=None):
return False
return True
def find_answers_by_keyword(directory, keyword, options="", request_options=None):
"""
Search in the whole tree of all cheatsheets or in its subtree `directory`
@@ -96,13 +99,13 @@ def find_answers_by_keyword(directory, keyword, options="", request_options=None
if not topic.startswith(directory):
continue
subtopic = topic[len(directory):]
if not options_dict["recursive"] and '/' in subtopic:
subtopic = topic[len(directory) :]
if not options_dict["recursive"] and "/" in subtopic:
continue
answer_dicts = get_answers(topic, request_options=request_options)
for answer_dict in answer_dicts:
answer_text = answer_dict.get('answer', '')
answer_text = answer_dict.get("answer", "")
# Temporary hotfix:
# In some cases answer_text may be 'bytes' and not 'str'
if type(b"") == type(answer_text):
@@ -111,10 +114,8 @@ def find_answers_by_keyword(directory, keyword, options="", request_options=None
if match(answer_text, keyword, options_dict=options_dict):
answers_found.append(answer_dict)
if len(answers_found) > CONFIG['search.limit']:
answers_found.append(
_limited_entry()
)
if len(answers_found) > CONFIG["search.limit"]:
answers_found.append(_limited_entry())
break
return answers_found
+14 -4
View File
@@ -6,29 +6,37 @@ from __future__ import print_function
import sys
import textwrap
try:
import urlparse
except ModuleNotFoundError:
import urllib.parse as urlparse
import config
config.CONFIG["cache.type"] = "none"
import cheat_wrapper
import options
def show_usage():
"""
Show how to use the program in the standalone mode
"""
print(textwrap.dedent("""
print(
textwrap.dedent(
"""
Usage:
lib/standalone.py [OPTIONS] QUERY
For OPTIONS see :help
""")[1:-1])
"""
)[1:-1]
)
def parse_cmdline(args):
"""
@@ -43,7 +51,8 @@ def parse_cmdline(args):
query_string = " ".join(args)
parsed = urlparse.urlparse("https://srv:0/%s" % query_string)
request_options = options.parse_args(
urlparse.parse_qs(parsed.query, keep_blank_values=True))
urlparse.parse_qs(parsed.query, keep_blank_values=True)
)
query = parsed.path.lstrip("/")
if not query:
@@ -61,5 +70,6 @@ def main(args):
answer, _ = cheat_wrapper.cheat_wrapper(query, request_options=request_options)
sys.stdout.write(answer)
if __name__ == '__main__':
if __name__ == "__main__":
main(sys.argv[1:])
+2
View File
@@ -4,12 +4,14 @@ Support for the stateful queries
import cache
def save_query(client_id, query):
"""
Save the last query `query` for the client `client_id`
"""
cache.put("l:%s" % client_id, query)
def last_query(client_id):
"""
Return the last query for the client `client_id`
+1
View File
@@ -16,3 +16,4 @@ colorama
pyyaml
python-Levenshtein
pytest
black