first commit
This commit is contained in:
commit
01f19ebb1d
22
.gitignore
vendored
Normal file
22
.gitignore
vendored
Normal file
@ -0,0 +1,22 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Bot files
|
||||
*.ini
|
||||
!example_config.ini
|
||||
*.json
|
||||
!configs/timetable/groups.json
|
||||
*.jpg
|
||||
*.db
|
||||
*.sqlite3
|
||||
*.txt
|
||||
*.pem
|
||||
!requirements.txt
|
||||
test.py
|
||||
venv/
|
||||
.vscode/
|
33
bot.py
Normal file
33
bot.py
Normal file
@ -0,0 +1,33 @@
|
||||
import logging
|
||||
import time
|
||||
import logging
|
||||
import sched
|
||||
import threading
|
||||
|
||||
from load import config, viber, app
|
||||
import handlers
|
||||
|
||||
|
||||
logging.basicConfig(
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
level=logging.INFO
|
||||
)
|
||||
|
||||
|
||||
def set_webhook(viber):
|
||||
viber.unset_webhook()
|
||||
viber.set_webhook(config.webhook.webhook_url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
scheduler = sched.scheduler(time.time, time.sleep)
|
||||
scheduler.enter(5, 1, set_webhook, (viber,))
|
||||
t = threading.Thread(target=scheduler.run)
|
||||
t.start()
|
||||
|
||||
context = (config.cert["cert"], config.cert["key"])
|
||||
app.run(host=config.webhook.host,
|
||||
port=int(config.webhook.port),
|
||||
debug=True,
|
||||
ssl_context=context
|
||||
)
|
32
config.py
Normal file
32
config.py
Normal file
@ -0,0 +1,32 @@
|
||||
from configparser import ConfigParser
|
||||
|
||||
from easydict import EasyDict as edict
|
||||
|
||||
|
||||
CONFIG_FILE = 'config.ini'
|
||||
|
||||
data = ConfigParser()
|
||||
data.read(CONFIG_FILE)
|
||||
|
||||
config = edict()
|
||||
config["cert"] = edict()
|
||||
|
||||
for section in data.sections():
|
||||
config[section] = edict()
|
||||
|
||||
for key, value in data.items(section):
|
||||
config[section][key] = value
|
||||
|
||||
config["cert"] = {
|
||||
"cert": f"{config.webhook.cert_folder}/{config.webhook.cert}",
|
||||
"key": f"{config.webhook.cert_folder}/{config.webhook.key}"
|
||||
}
|
||||
config["data_file"] = f"{config.parser.folder}/{config.parser.file}"
|
||||
|
||||
def parse_bool(section=None, key=None, text=None):
|
||||
if text is not None:
|
||||
return text.lower() in ['yes', 'true']
|
||||
|
||||
|
||||
if section is not None and key is not None:
|
||||
return config[section][key].lower() in ['yes', 'true']
|
109
handlers.py
Normal file
109
handlers.py
Normal file
@ -0,0 +1,109 @@
|
||||
import io
|
||||
import base64
|
||||
|
||||
from flask import request, Response, send_file
|
||||
from viberbot.api.messages.url_message import URLMessage
|
||||
from viberbot.api.messages.text_message import TextMessage
|
||||
from viberbot.api.messages.picture_message import PictureMessage
|
||||
from viberbot.api.messages.keyboard_message import KeyboardMessage
|
||||
from viberbot.api.viber_requests import ViberMessageRequest
|
||||
from viberbot.api.viber_requests import ViberConversationStartedRequest
|
||||
|
||||
from load import app, logger, viber, config
|
||||
from config import parse_bool
|
||||
from parser.parser import get_about_replacements, docs_parse
|
||||
|
||||
|
||||
ENABLE_PARSER = parse_bool(text=config.parser.enable_parse)
|
||||
|
||||
KEYBOARD = {
|
||||
"Type": "keyboard",
|
||||
"Buttons": [
|
||||
{
|
||||
"Columns": 6,
|
||||
"Rows": 1,
|
||||
"BgColor": "#e6f5ff",
|
||||
"BgLoop": True,
|
||||
"ActionType": "reply",
|
||||
"ActionBody": "replaces",
|
||||
#"ReplyType": "message",
|
||||
"Text": "Заміни"
|
||||
},
|
||||
{
|
||||
"Columns": 3,
|
||||
"Rows": 2,
|
||||
"BgColor": "#e6f5ff",
|
||||
"BgLoop": True,
|
||||
"ActionType": "reply",
|
||||
"ActionBody": "link",
|
||||
#"ReplyType": "message",
|
||||
"Text": "Посилання на файл"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
if ENABLE_PARSER:
|
||||
KEYBOARD["Buttons"].append({
|
||||
"Columns": 3,
|
||||
"Rows": 2,
|
||||
"BgColor": "#e6f5ff",
|
||||
"BgLoop": True,
|
||||
"ActionType": "reply",
|
||||
"ActionBody": "update",
|
||||
#"ReplyType": "message",
|
||||
"Text": "Обновити дані"
|
||||
})
|
||||
else:
|
||||
KEYBOARD["Buttons"][1]["Columns"] = 6
|
||||
|
||||
def get_text_button(index: int):
|
||||
return KEYBOARD["Buttons"][index]["ActionBody"]
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
def incoming():
|
||||
logger.debug("received request. post data: {0}".format(request.get_data()))
|
||||
|
||||
viber_request = viber.parse_request(request.get_data().decode('utf8'))
|
||||
|
||||
if isinstance(viber_request, ViberConversationStartedRequest):
|
||||
viber.send_messages(viber_request.user.id, [
|
||||
KeyboardMessage(keyboard=KEYBOARD)
|
||||
])
|
||||
|
||||
|
||||
if isinstance(viber_request, ViberMessageRequest):
|
||||
message = viber_request.message.text
|
||||
if message == "id": viber.send_messages(viber_request.sender.id, [TextMessage(text=viber_request.sender.id, keyboard=KEYBOARD)])
|
||||
if message == get_text_button(0):
|
||||
data = get_about_replacements()
|
||||
if 'image' in data:
|
||||
viber.send_messages(viber_request.sender.id, [
|
||||
PictureMessage(
|
||||
media=f"{config.webhook.webhook_url}/image",
|
||||
keyboard=KEYBOARD
|
||||
)
|
||||
])
|
||||
elif message == get_text_button(1):
|
||||
viber.send_messages(viber_request.sender.id, [
|
||||
URLMessage(media=config.parser.link, keyboard=KEYBOARD)
|
||||
])
|
||||
elif ENABLE_PARSER and message == get_text_button(2) and viber_request.sender.id:
|
||||
if viber_request.sender.id not in config.parser.admins.split(","):
|
||||
viber.send_messages(viber_request.sender.id, [TextMessage(text="Ви не адміністратор!")])
|
||||
else:
|
||||
try:
|
||||
docs_parse()
|
||||
viber.send_messages(viber_request.sender.id, [
|
||||
TextMessage(text="Замены обновлены!", keyboard=KEYBOARD)])
|
||||
except Exception as e:
|
||||
print(e)
|
||||
viber.send_messages(viber_request.sender.id, [
|
||||
TextMessage(text='Ошибка обновления', keyboard=KEYBOARD)])
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
@app.route('/image')
|
||||
def get_image():
|
||||
data = get_about_replacements()
|
||||
if 'image' in data:
|
||||
return send_file(io.BytesIO(base64.b64decode(data['data']['all'])), mimetype='image/jpeg')
|
18
load.py
Normal file
18
load.py
Normal file
@ -0,0 +1,18 @@
|
||||
import logging
|
||||
|
||||
from flask import Flask
|
||||
from viberbot import Api
|
||||
from viberbot.api.bot_configuration import BotConfiguration
|
||||
|
||||
from config import config
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
viber = Api(BotConfiguration(
|
||||
name=config.bot.name,
|
||||
avatar=config.bot.avatar,
|
||||
auth_token=config.bot.token,
|
||||
))
|
2
parser/__init__.py
Normal file
2
parser/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from .parser import get_about_replacements, docs_parse
|
||||
__all__ = ['get_about_replacements', 'docs_parse']
|
67
parser/parser.py
Normal file
67
parser/parser.py
Normal file
@ -0,0 +1,67 @@
|
||||
import base64
|
||||
import json
|
||||
import datetime
|
||||
from datetime import datetime as dt
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
try:
|
||||
from load import config
|
||||
except ImportError: config = None
|
||||
try:
|
||||
from .utils import *
|
||||
except ImportError:
|
||||
from utils import *
|
||||
|
||||
|
||||
headers = {
|
||||
'user-agent':(
|
||||
"Mozilla/5.0 (Windows NT 10.0; WOW64) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
"Chrome/62.0.3202.9 Safari/537.36"
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
def date_parser_helper(days:int, parse:str="%d.%m.20%y"):
|
||||
return dt.strftime(
|
||||
dt.now() +
|
||||
datetime.timedelta(days=days),
|
||||
parse
|
||||
)
|
||||
|
||||
|
||||
def docs_parse():
|
||||
|
||||
output = {
|
||||
"data":{},
|
||||
"another_teacher":None
|
||||
}
|
||||
|
||||
page = requests.get(config.parser.link, headers=headers)
|
||||
page.encoding = 'utf-8'
|
||||
|
||||
soup = BeautifulSoup(page.text, "lxml")
|
||||
|
||||
# Это в идеале нужно переписать...
|
||||
url = image_parser(soup)
|
||||
with requests.get(url=url, allow_redirects=True, stream=True) as r:
|
||||
output['image'] = True
|
||||
output['date'] = 'невозможно получить!'
|
||||
output['data']['all'] = base64.b64encode(r.content).decode('utf-8')
|
||||
|
||||
|
||||
with open(config.data_file, 'w') as f:
|
||||
json.dump(output, f, ensure_ascii=False)
|
||||
f.close()
|
||||
|
||||
|
||||
def get_about_replacements() -> dict:
|
||||
with open(config.data_file, 'r') as f:
|
||||
data = json.loads(f.read())
|
||||
f.close()
|
||||
return data
|
||||
|
||||
if __name__ == "__main__":
|
||||
docs_parse()
|
34
parser/utils.py
Normal file
34
parser/utils.py
Normal file
@ -0,0 +1,34 @@
|
||||
from bs4 import BeautifulSoup
|
||||
from typing import Any
|
||||
|
||||
def table_parser(soup: BeautifulSoup, output):
|
||||
#Date parser
|
||||
date = (soup.find("main").findAll('span', style="color:black"))[1]
|
||||
output["date"] = date.text.replace(u'\xa0', u'')
|
||||
|
||||
|
||||
#Replaces parser
|
||||
replaces = soup.findAll('tr')
|
||||
for data in replaces:
|
||||
|
||||
text = (
|
||||
data.find("td", valign="top")
|
||||
.find("span", style="color:black")
|
||||
.text.replace(u'\xa0', u'')
|
||||
)
|
||||
group = (
|
||||
data.find("span", style="color:black")
|
||||
.text.replace(" ", "").replace(u'\xa0', u''))
|
||||
output["data"][group] = text
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def image_parser(soup: BeautifulSoup):
|
||||
image: Any
|
||||
extension = ('png', 'jpg')
|
||||
main = soup.find("main")
|
||||
for ext in extension:
|
||||
image = main.select(f'img[src$=".{ext}"]')
|
||||
if image:
|
||||
return image[0]['src']
|
Loading…
Reference in New Issue
Block a user