Init commit

This commit is contained in:
Jemacivan 2022-02-16 17:13:44 +02:00
commit 8060b933a5
Signed by: tema
GPG Key ID: 21FDB6D162488F6F
42 changed files with 1281 additions and 0 deletions

18
.gitignore vendored Normal file
View File

@ -0,0 +1,18 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Bot files
*.ini
!example_config.ini
*.json
*.db
*.sqlite3
*.txt
!requirements.txt
test.py
venv/

View File

@ -0,0 +1,38 @@
# Installing
**Create VirtualEnv**
```
python -m venv venv
```
**Activate VirtualEnv**
```
source venv/bin/activate
```
**Installing requirements**
```
pip install -r requirements.txt
```
**Rename example_config.ini -> config.ini
Enter Telegram Bot API Token on config.ini
Optional: setup webhook, telegram bot api server, etc.
Enter DocumentID ([How to get DocumentID](https://developers.google.com/docs/api/how-tos/overview#document_id))
[Create project and enable Docs API](https://developers.google.com/workspace/guides/create-project)
Put credentials.json ([How to get credentials.json](https://developers.google.com/workspace/guides/create-credentials?hl=ru#create_a_oauth_client_id_credential))**
**Run setup docs parser**
```
python setup_google_docs_api.py
```
**Follow Setup Wizard
Open web page
Login your account
If you get error, setup redirect in Project**
# Running
```
python bot.py
```
**Run on docker: Supported!**

View File

@ -0,0 +1,31 @@
[Docs_Settings]
Document_ID = 123ABC
Config_folder = configs/
token_file = token.json
credentials_file = credentials.json
data_file = data.json
[Bot]
token = 123:JAKD
; None = Not used local telegram bot api server
; Example http://127.0.0.1:8888
telegram_bot_api_server = none
; True or False
use_webhook = false
ip = 127.0.0.1
port = 3001
; if you don`t use local TelegramBotAPI Server + WebHooks -> Set settings in bot.py
[Users]
;Uncomment this variable, if you use filters to users
;allowed_users = 0,1,2,3
admin_users = 0,1,2,3
[DataBase]
enable_logging = yes
database_link = sqlite:///db.sqlite3
[announcements]
;Seconds only/
time = 14400
;////////////

View File

@ -0,0 +1,2 @@
from .parser import get_about_replacements, docs_parse
__all__ = ['get_about_replacements', 'docs_parse']

View File

@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import json
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from load import config
from .utils import Helper
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/documents.readonly']
__all__ = ['docs_parse', 'get_about_replacements']
def docs_parse() -> None:
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists(config.token_file):
creds = Credentials.from_authorized_user_file(
config.token_file,
SCOPES
)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
config.credentials_file, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(config.token_file, 'w') as token:
token.write(creds.to_json())
service = build('docs', 'v1', credentials=creds)
# Retrieve the documents contents from the Docs service.
document = service.documents().get(documentId=config.documentid).execute()
if os.path.exists(config.data_file):
os.remove(config.data_file)
with open(config.data_file, 'w') as f:
json.dump(document, f, ensure_ascii=False)
f.close()
def read_parse_data():
with open(config.data_file, 'r') as f:
data = json.loads(f.read())
f.close()
return data
def get_about_replacements() -> dict:
helper = Helper()
document = read_parse_data()
info = []
element = helper.get_table_element()
try:
count = document['body']["content"][element]["table"]["rows"]
except (IndexError, KeyError):
element = helper.find_with_table(document)
if element:
count = document['body']["content"][element]["table"]["rows"]
else:
info = helper.find_with_text(document)
date = helper.get_date(document)
another_teacher = helper.teacher(document)
if element:
for c in range(0, count):
more_replaces = (document['body']
["content"][element]["table"]
["tableRows"][c]["tableCells"][1]
["content"]
)
replaces = ''
for i in range(0, len(more_replaces)):
replaces += (document['body']["content"][element]["table"]
["tableRows"][c]["tableCells"][1]
["content"][i]["paragraph"]["elements"][0]
["textRun"]["content"].rstrip("\n"))
info.append(
(
document['body']["content"][element]["table"]
["tableRows"][c]["tableCells"][0]
["content"][0]["paragraph"]["elements"][0]
["textRun"]["content"].rstrip("\n"),
replaces
)
)
return {
'date': date if type(date) != type(False) else "Error" ,
'data': dict(info),
'another_teacher': another_teacher,
}

View File

@ -0,0 +1,188 @@
import os
import datetime
from datetime import datetime as dt
from load import config
def date_parser_helper(days:int, parse:str="%d.%m.20%y"):
return dt.strftime(
dt.now() +
datetime.timedelta(days=days),
parse
)
'''
self.months = {
1: "січень",
2: "лютий",
3: "березень",
4: "квітень",
5: "травень",
6: "червень",
7: "липень",
8: "серпень",
9: "вересень",
10: "жовтень",
11: "листопад",
12: "грудень"
}
'''
class Helper():
def __init__(self):
self.date_now = date_parser_helper(0)
self.date_next = date_parser_helper(1)
self.weekend_pass = date_parser_helper(2)
self.two_day_pass = date_parser_helper(3)
self.black_list = [
'черговий викладач',
self.date_now,
self.date_next,
self.weekend_pass,
self.two_day_pass
]
@staticmethod
def find_with_table(document):
c_element = 2
while True:
try:
document['body']["content"][c_element]["table"]["rows"]
break
except KeyError:
c_element += 1
if c_element > 15:
return False
except IndexError:
return False
with open("{}/table_element.txt".format(config.config_folder), 'w') as f:
f.write(str(c_element))
f.close()
return c_element
def find_with_text(self, document):
format_charset = '-'
alternative_format_charset = "\t"
element = 4
data = []
text = ''
while element < 15:
doc = (
document['body']["content"][element]
["paragraph"]["elements"][0]["textRun"]["content"]
).rstrip("\n").replace("", "-", 1)
if (
(
("-" in doc)
#and
#("\t" not in doc)
)
and
([p not in doc.lower() for p in self.black_list][0])
):
try:
group, text = doc.split(format_charset)
except ValueError:
if element > 6:
break
else:
try:
group, text = doc.split(alternative_format_charset)
except ValueError:
if element > 6:
break
if text != '':
data.append(
(group.strip(" "), text.lstrip(" ").replace("\t", ""))
)
element += 1
return data
def get_date(self, document):
date_element = 1
while date_element < 16:
try:
date = (
document['body']["content"][date_element]
["paragraph"]["elements"][0]["textRun"]["content"]
.rstrip(" \n"))
except:
date_element += 1
if (
(
(
self.date_now in date.lower()
.lstrip("заміни").lstrip("на").replace(" ", "")
)
or
(
self.date_next in date.lower()
.lstrip("заміни").lstrip("на").replace(" ", "")
)
or
(
self.weekend_pass in date.lower()
.lstrip("заміни").lstrip("на").replace(" ", "")
)
or
(
self.two_day_pass in date.lower()
.lstrip("заміни").lstrip("на").replace(" ", "")
)
)
or
(
"заміни на" in date.lower()
)
):
return date
else:
date_element += 1
return False
@staticmethod
def get_table_element():
if os.path.exists(f"{config.config_folder}/table_element.txt"):
element = int(
open(
f"{config.config_folder}/table_element.txt",
'r'
)
.read()
)
else:
element = 6
return element
@staticmethod
def teacher(document):
element = 1
while element < 6:
if "paragraph" in document['body']["content"][element]:
length_element = (len(document['body']["content"][element]
["paragraph"]["elements"]))
doc = (
document['body']["content"][element]["paragraph"]["elements"]
[0]["textRun"]["content"].rstrip("\n")
)
if 'черговий викладач' in doc.lower().replace("", ""):
return doc
elif length_element > 1:
for p in range(length_element):
doc = (
document['body']["content"][element]
["paragraph"]["elements"]
[p]["textRun"]["content"].rstrip("\n")
)
if 'черговий викладач' in doc.lower().replace("", ""):
return doc
element += 1

View File

@ -0,0 +1,9 @@
google-api-python-client
google-auth-httplib2
google-auth-oauthlib
peewee
aiogram
cryptography
pymysqldb
psycopg2
aioschedule

View File

@ -0,0 +1,9 @@
#!/usr/bin/env python3
'''
Don`t move this file!
'''
if __name__ == '__main__':
from parser import docs_parse
docs_parse()

69
bot.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
import sys
import logging
import asyncio
from aiogram import executor
import filters
import handlers
from utils.announcements import scheduler
from load import dp, bot, config
from utils import set_commands
if (len(sys.argv) >= 2) and (sys.argv[1] == '-u'):
from parser import docs_parse; docs_parse()
sys.exit(0)
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger("Bot")
WEBAPP_HOST = config.bot("ip")
WEBAPP_PORT = config.bot("port")
WEBHOOK_HOST = f'http://{WEBAPP_HOST}:{WEBAPP_PORT}'
WEBHOOK_PATH = f'/bot{config.bot("token")}/'
WEBHOOK_URL = f"{WEBHOOK_HOST}{WEBHOOK_PATH}"
async def on_startup(dp):
await set_commands(dp)
await bot.set_webhook(url=WEBHOOK_URL)
async def on_shutdown(dp):
await bot.delete_webhook()
def main() -> None:
if config.logging_user:
logger.info("Logging enabled!")
else:
logger.info("Logging disabled!")
#loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
#loop.create_task(scheduler())
if config.bot("use_webhook").lower() in ['t', 'true', '1', 'yes', 'y']:
executor.start_webhook(
dispatcher=dp,
loop=loop,
webhook_path=WEBHOOK_PATH,
on_startup=on_startup,
skip_updates=False,
on_shutdown=on_shutdown,
host=WEBAPP_HOST,
port=WEBAPP_PORT,
)
else:
executor.start_polling(dp, skip_updates=True)
if __name__ == '__main__':
main()

1
configs/__init__.py Normal file
View File

@ -0,0 +1 @@
from .configure import Configure

32
configs/configure.py Normal file
View File

@ -0,0 +1,32 @@
from configparser import ConfigParser
from .module import Config
CONFIG_FILE = 'config.ini'
class Configure(Config):
def __init__(self):
self.config = ConfigParser()
self.data = dict()
self.__readconfig()
def __readconfig(self):
self.config.read(CONFIG_FILE)
for section in self.config.sections():
self.data[section] = dict()
for (key, value) in self.config.items(section):
self.data[section][key] = value
def bot(self, key):
return self.data["Bot"][key]
def db(self, key):
return self.data["DataBase"][key]
def anons(self, key):
return self.data["announcements"][key]

49
configs/module.py Normal file
View File

@ -0,0 +1,49 @@
class Config():
@property
def config_folder(self):
return self.config.get("Docs_Settings", "Config_folder").rstrip("/")
@property
def documentid(self):
return self.config.get("Docs_Settings", 'Document_ID')
@property
def token_file(self):
file = self.config.get("Docs_Settings", "token_file")
return (self.config_folder + "/" + file)
@property
def data_file(self):
file = self.config.get("Docs_Settings", "data_file")
return (self.config_folder + "/" + file)
@property
def credentials_file(self):
file = self.config.get("Docs_Settings", "credentials_file")
return (self.config_folder + "/" + file)
@property
def allowed_users(self):
usrs = self.config.get("Users", "allowed_users").split(',')
return [int(user_id) for user_id in usrs]
@property
def admin_user(self):
usrs = self.config.get("Users", "admin_users").split(',')
return [int(user_id) for user_id in usrs]
@property
def telegram_bot_api_server(self):
server = self.config.get("Bot", "telegram_bot_api_server")
if str(server).lower() == "none":
return "https://api.telegram.org"
else:
return server
@property
def logging_user(self):
o = self.config.get("DataBase", "enable_logging")
if o.lower() in ['t', "yes", "true"]:
return True
return False

2
database/__init__.py Normal file
View File

@ -0,0 +1,2 @@
__all__ = ["register", "get_all_users"]
from .worker import register, get_all_users, set_group_settings, get_group

26
database/models.py Normal file
View File

@ -0,0 +1,26 @@
from datetime import datetime
from peewee import Model, BigIntegerField, CharField, DateTimeField
from load import db
class Users(Model):
user_id = BigIntegerField(null=False)
first_name = CharField(null=True)
last_name = CharField(null=True)
username = CharField(null=True)
date = DateTimeField(default=datetime.now)
class Meta:
database = db
class Chat(Model):
chat_id = BigIntegerField(null=False)
group = CharField(null=False)
class Meta:
database = db

52
database/worker.py Normal file
View File

@ -0,0 +1,52 @@
from typing import Union, List
from .models import Users, Chat, db
db.create_tables([Users, Chat])
def register(
user_id: int,
username: Union[str, None],
first_name: str,
last_name: Union[str, None]
) -> None:
if not Users.select().where(Users.user_id == user_id).exists():
Users.create(
user_id=user_id,
username=username,
first_name=first_name,
last_name=last_name
)
else:
(Users.update(
username=username,
first_name=first_name,
last_name=last_name
)
.where(Users.user_id == user_id).execute())
def get_all_users() -> List[int]:
usr = []
for user in Users.select():
usr.append(user.user_id)
return usr
def set_group_settings(chat_id: int, group: Union[int, str]) -> None:
if Chat.select().where(Chat.chat_id == chat_id).exists():
Chat.update(group=group).where(Chat.chat_id == chat_id).execute()
else:
Chat.create(
chat_id=chat_id,
group=group
)
def get_group(chat_id: int) -> Union[str, None]:
if Chat.select().where(Chat.chat_id == chat_id).exists():
return Chat.get(Chat.chat_id == chat_id).group
return None

16
dockerfile Normal file
View File

@ -0,0 +1,16 @@
FROM alpine:latest
COPY . /usr/src/bot
WORKDIR /usr/src/bot
RUN apk update \
&& apk add \
build-base \
gcc \
musl-dev \
python3-dev \
py3-pip \
postgresql-dev
RUN pip install --no-cache-dir -r requirements.txt
CMD ["python3", "bot.py"]

91
engineering_works.py Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/env python3
import logging
from aiogram import executor, types
from load import bot, dp, config
from database import get_all_users
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
WEBAPP_HOST = config.bot("ip")
WEBAPP_PORT = config.bot("port")
WEBHOOK_HOST = f'http://{WEBAPP_HOST}:{WEBAPP_PORT}'
WEBHOOK_PATH = f'/bot{config.bot("token")}/'
WEBHOOK_URL = f"{WEBHOOK_HOST}{WEBHOOK_PATH}"
engeneerings_works = (
"Техничиские работы..."
"Постараемся восстановить работу как можно раньше!"
)
parse_error = (
"Бот приостановлен на неопределенный срок!\n"
"Что случилось?\n"
"Администрация коледжа изменила формат файла с google docs на docx(Microsoft Office)\n"
"Замены вы можете посмотреть тут: https://docs.google.com/document/d/{}".format(config.documentid)
)
new_year = (
"С новым годом!❄️\n"
"Бот будет отключён до 16.01.2022(Период зимних каникул)\n"
)
link_replace = 'https://tfk.org.ua/zamini-do-rozkladu-08-51-30-03-02-2022/'
the_end =(
"Всё было восстановлено и настроено. Бот продолжает работу!:)"
)
send_msg = the_end
async def on_startup(dp):
await bot.set_webhook(url=WEBHOOK_URL)
async def on_shutdown(dp):
await bot.delete_webhook()
@dp.message_handler(commands=['send'])
async def asd(message):
for user_id in get_all_users():
if user_id != 1083440854:
print(user_id)
try:
await bot.send_message(chat_id=user_id, text=send_msg)
except:
pass
#@dp.message_handler()
async def start(message: types.Message):
logging.info(
"{} - {}".format(
message.from_user.id,
message.from_user.username
)
)
await bot.send_message(
message.chat.id,
engeneerings_works
)
if __name__ == "__main__":
if config.bot("use_webhook").lower() in ['t', 'true', '1', 'yes', 'y']:
executor.start_webhook(
dispatcher=dp,
webhook_path=WEBHOOK_PATH,
on_startup=on_startup,
skip_updates=True,
on_shutdown=on_shutdown,
host=WEBAPP_HOST,
port=WEBAPP_PORT,
)
else:
executor.start_polling(dp, skip_updates=True)

31
example_config.ini Normal file
View File

@ -0,0 +1,31 @@
[Docs_Settings]
Document_ID = 123ABC
Config_folder = configs/
token_file = token.json
credentials_file = credentials.json
data_file = data.json
[Bot]
token = 123:JAKD
; None = Not used local telegram bot api server
; Example http://127.0.0.1:8888
telegram_bot_api_server = none
; True or False
use_webhook = false
ip = 127.0.0.1
port = 3001
; if you don`t use local TelegramBotAPI Server + WebHooks -> Set settings in bot.py
[Users]
;Uncomment this variable, if you use filters to users
;allowed_users = 0,1,2,3
admin_users = 0,1,2,3
[DataBase]
enable_logging = yes
database_link = sqlite:///db.sqlite3
[announcements]
;Seconds only/
time = 14400
;////////////

7
filters/__init__.py Normal file
View File

@ -0,0 +1,7 @@
from load import dp
from .main import BotAdmin #OnlyMy
if __name__ == "filters":
dp.filters_factory.bind(BotAdmin)
#dp.filters_factory.bind(OnlyMy)

36
filters/main.py Normal file
View File

@ -0,0 +1,36 @@
from aiogram import types
from aiogram.dispatcher.filters import BoundFilter
from load import config
'''
class OnlyMy(BoundFilter):
key = 'only_my'
def __init__(self, only_my):
self.onlymy = only_my
async def check(self, message: types.Message):
logging.info("User: {user_id} - {username}".format(
user_id=str(message.from_user.id),
username=str(message.from_user.username)
))
if message.from_user.id in config.allowed_users:
return True
return False
'''
class BotAdmin(BoundFilter):
key = 'admin'
def __init__(self, admin):
self.admin = admin
async def check(self, message: types.Message):
if message.from_user.id in config.admin_user:
return True
else:
await message.answer("Хорошая попытка, но ты не администратор!")
return False

4
handlers/__init__.py Normal file
View File

@ -0,0 +1,4 @@
from . import groups
from . import private
from . import callback
from . import errors

View File

@ -0,0 +1 @@
from . import main

56
handlers/callback/main.py Normal file
View File

@ -0,0 +1,56 @@
import logging
from aiogram import types
from load import dp
from keyboards.inline.keyboard import cancel_button, menu
from parser import get_about_replacements
@dp.callback_query_handler(lambda c: c.data != "back")
async def callback_query(query: types.CallbackQuery):
from_user = query.from_user
data = get_about_replacements()
group = query.data
logging.info("Button: {btn}, User: {user_id} - {username}".format(
user_id=str(from_user.id),
username=str(from_user.username),
btn=str(group)
))
if group in data['data']:
await query.message.edit_text(
text="Группа: {group}\nЗамены: {replace}"
.format(
group=str(group),
replace=data['data'][group]
),
reply_markup=cancel_button
)
else:
await query.message.edit_text(
text=(
"Группа: {group} не найдена!\n"
"Список обновится автоматически после нажатия кнопки ниже"
)
.format(
group=str(group),
),
reply_markup=cancel_button
)
#await query.answer()
@dp.callback_query_handler(lambda c: c.data == "back")
async def back_button(query: types.CallbackQuery):
data = get_about_replacements()
await query.message.edit_text(
"{date}\n{teacher}\nВыберите свою группу"
.format(
date=data["date"],
teacher=data["another_teacher"]
),
reply_markup=menu(data["data"])
)
await query.answer()

View File

@ -0,0 +1 @@
from . import main

11
handlers/errors/main.py Normal file
View File

@ -0,0 +1,11 @@
import logging
from aiogram.utils.exceptions import BotBlocked
from load import dp
@dp.errors_handler()
async def errors_handler(update, exception):
if isinstance(exception, BotBlocked):
logging.info("Bot blocked")
return True

View File

@ -0,0 +1 @@
from . import main

64
handlers/groups/main.py Normal file
View File

@ -0,0 +1,64 @@
import logging
from aiogram import types
from aiogram.dispatcher.filters import ChatTypeFilter
from load import dp, bot, config
from database import set_group_settings, get_group
from parser import get_about_replacements
from database import register
@dp.message_handler(ChatTypeFilter(['group', 'supergroup']), commands=['set'])
async def set_group(message: types.Message):
if (message.from_user.id not in [admin.user.id for admin in await bot.get_chat_administrators(message.chat.id)]) and (message.from_user.id not in config.admin_user):
await message.answer("Вы не являетесь администратором чата!")
return
args = message.text.split()
if len(args) < 2:
await message.answer(
("Вы не передали имя своей группы!\n"
"Пример: /set 221")
)
return
set_group_settings(message.chat.id, args[1])
await message.answer("Настройка завершена успешно!")
@dp.message_handler(ChatTypeFilter(['group', 'supergroup']), commands=['start', 'get'])
async def get_replace_on_chat(message: types.Message):
if config.logging_user:
register(
user_id=message.from_user.id,
username=message.from_user.username,
first_name=str(message.from_user.first_name),
last_name=message.from_user.last_name
)
logging.info("User: {user_id} - {username}".format(
user_id=str(message.from_user.id),
username=str(message.from_user.username)
))
data = get_about_replacements()
group = get_group(message.chat.id)
if group is not None:
if group in data['data']:
await message.answer(
(
"Группа: {group}\n"
"Замены {date}\n"
"{teacher}\n"
"Замены: {replace}\n"
).format(
group=str(group),
replace=data['data'][group],
date=data["date"].lower(),
teacher=data["another_teacher"]
)
)
else:
await message.answer("Похоже замен нет")
else:
await message.answer("Похоже администратор группы не настроил привязку")

View File

@ -0,0 +1,2 @@
from . import main
from . import admin

23
handlers/private/admin.py Normal file
View File

@ -0,0 +1,23 @@
import logging
from aiogram import types
from load import dp, bot
from parser import docs_parse
@dp.message_handler(admin=True, commands=['reload'])
async def refresh(message: types.Message):
m = await bot.send_message(
message.chat.id,
"Идёт обновление информации..."
)
try:
docs_parse()
await m.edit_text(
"Информация о заменах была обновлена!"
)
except Exception as e:
logging.error(e)
await m.edit_text(
"Произойшла ошибка!"
)

82
handlers/private/main.py Normal file
View File

@ -0,0 +1,82 @@
import logging
from aiogram import types
from aiogram.dispatcher.filters import ChatTypeFilter
from load import dp, bot, config
from keyboards.inline.keyboard import menu
from parser import get_about_replacements
if config.logging_user:
from database import register
@dp.message_handler(commands=["help"])
async def help_msg(message: types.Message):
await bot.send_message(
message.chat.id,
(
"Я всего-лишь небольшой помощник:3\n"
"Умею работать в чатах, для настройки попросите администратора чата указать группу с помощью команды /set\n"
"/set - Установить группу, для получения данных о заменах(Работает ТОЛЬКО в чатах)\n"
"/start /get - получить информацию о заменах\n"
)
)
@dp.message_handler(ChatTypeFilter(['private']), commands=['start', 'get'])
async def get_replace(message: types.Message):
if config.logging_user:
register(
user_id=message.from_user.id,
username=message.from_user.username,
first_name=str(message.from_user.first_name),
last_name=message.from_user.last_name
)
link = (
'<a href="{}">Проверьте замены тут</a>'
.format(config.bot("link"))
)
logging.info("User: {user_id} - {username}".format(
user_id=str(message.from_user.id),
username=str(message.from_user.username)
))
try:
data = get_about_replacements()
await bot.send_message(
message.chat.id,
"Замены {date}\n{teacher}\nВыберите свою группу"
.format(
date=data["date"].lower(),
teacher=str(data["another_teacher"]).title()
),
reply_markup=menu(data["data"])
)
except Exception as e:
logging.error(str(e))
err_msg = (
"Техничиские шоколадки... "
f"Скорее всего структура файла была изменена\n{link}"
)
await bot.send_message(
message.chat.id,
err_msg,
parse_mode='HTML',
disable_web_page_preview=True
)
@dp.message_handler(commands=['link'])
async def get_link(message: types.Message):
msg = (
'<a href="{}">Проверьте замены тут</a>'
.format(config.bot("link"))
)
await bot.send_message(
message.chat.id,
msg,
parse_mode='HTML',
disable_web_page_preview=True
)

2
keyboards/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from . import inline
from . import default

View File

View File

@ -0,0 +1 @@
from . import keyboard

View File

@ -0,0 +1,21 @@
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
def menu(data: dict) -> InlineKeyboardMarkup:
markup = InlineKeyboardMarkup()
for k in data:
if k.replace(" ", "") != "":
markup.add(
InlineKeyboardButton(
k, callback_data=str(k)
)
)
return markup
cancel_button = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton("Назад", callback_data="back")
]
]
)

16
load.py Normal file
View File

@ -0,0 +1,16 @@
from aiogram import Dispatcher, Bot
from aiogram.bot.api import TelegramAPIServer
from playhouse.db_url import connect
from configs import Configure
config = Configure()
db = connect(config.db("db_link"))
bot = Bot(
token=config.bot("token"),
server=TelegramAPIServer.from_base(config.telegram_bot_api_server)
)
dp = Dispatcher(bot)

2
parser/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from .parser import get_about_replacements, docs_parse
__all__ = ['get_about_replacements', 'docs_parse']

63
parser/parser.py Normal file
View File

@ -0,0 +1,63 @@
import requests
import json
import datetime
from datetime import datetime as dt
from bs4 import BeautifulSoup
try:
from load import config
except: config = None
from .utils import *
headers = {
'user-agent':(
"Mozilla/5.0 (Windows NT 10.0; WOW64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/62.0.3202.9 Safari/537.36"
)
}
def date_parser_helper(days:int, parse:str="%d.%m.20%y"):
return dt.strftime(
dt.now() +
datetime.timedelta(days=days),
parse
)
def docs_parse():
output = {
"data":{},
"another_teacher":None
}
page = requests.get(config.bot("link"), headers=headers)
page.encoding = 'utf-8'
soup = BeautifulSoup(page.text, "html.parser")
# Это в идеале нужно переписать...
try: output = table_parser(soup, output); #print(output)
except Exception: pass
try: output = one_parser(soup, output); #print(output)
except Exception: pass
try: output = parser_two(soup, output); print(output)
except Exception as e: raise(e)
#try: output = parser3(soup, output); print(output)
#except Exception as e: raise(e)
with open(config.data_file, 'w') as f:
json.dump(output, f, ensure_ascii=False)
f.close()
def get_about_replacements() -> dict:
with open(config.data_file, 'r') as f:
data = json.loads(f.read())
f.close()
return data

68
parser/utils.py Normal file
View File

@ -0,0 +1,68 @@
def table_parser(soup, output):
#Date parser
date = (soup.find("main").findAll('span', style="color:black"))[1]
output["date"] = date.text.replace(u'\xa0', u'')
#Replaces parser
replaces = soup.findAll('tr')
for data in replaces:
text = (
data.find("td", valign="top")
.find("span", style="color:black")
.text.replace(u'\xa0', u'')
)
group = (
data.find("span", style="color:black")
.text.replace(" ", "").replace(u'\xa0', u''))
output["data"][group] = text
return output
def one_parser(soup, output):
raw_data = soup.find("main").findAll("p")
date = (
raw_data[3].find("span", style="font-size:16px;").b.text.lower()
.replace(u"\xa0", u"").replace("на", "").replace("\r", "")
.replace("ЗАМІНИ ДО РОЗКЛАДУ".lower(), "").split("\n")
)
output["date"] = date[1].lstrip(" ")
for p in raw_data[4].find("span",style="font-size:16px;").b.text.replace(u"\xa0", u"").split("\n"):
p = p.lstrip(" ")
data_rep = (p.lstrip(" ").split(" ", 1))
group = data_rep[0]
text = data_rep[1].replace("\r", "").lstrip(" ")
output["data"][group] = text
return output
def parser_two(soup, output):
raw_data = soup.find("main").findAll("p")[2]
data = raw_data.text.split("\n")
output["date"] = data[1].replace("\r", "")
for p in data[3:]:
r_data = p.split(maxsplit=1)
try:
group = r_data[0].replace(u"\xa0", u"").replace("\r", "")
text = r_data[1].replace(u"\xa0", u"").replace("\r", "")
except IndexError: break
output["data"][group] = text
return output
def parser3(soup, output):
raw_data = soup.find("main").findAll("p")
output["date"] = (
raw_data[2].text
.replace("\r", "")
.replace("ЗАМІНИ НА", "").lstrip(" ").rstrip(" ").lower()
)
for p in raw_data[5:]:
r_data = p.text.split("-", maxsplit=1)
group = r_data[0]
text = r_data[1]
output["data"][group] = text
return output

10
requirements.txt Normal file
View File

@ -0,0 +1,10 @@
#google-api-python-client
#google-auth-httplib2
#google-auth-oauthlib
bs4
peewee
aiogram
cryptography
pymysqldb
psycopg2
aioschedule

1
utils/__init__.py Normal file
View File

@ -0,0 +1 @@
from .bot_commands import set_commands

27
utils/announcements.py Normal file
View File

@ -0,0 +1,27 @@
import datetime
import asyncio
import aioschedule as schedule
from load import dp, config
from parser import docs_parse
async def announce():
date_now = datetime.datetime.today().weekday()
if (date_now == 5) or (date_now == 6): return
message = "Замены были обновлены, возможно появились изменения!)"
try:
docs_parse()
except Exception:
message = "Ошибка обновления данных!"
if config.admin_user is not None:
for user_id in config.admin_user:
await dp.bot.send_message(user_id, message)
async def scheduler():
schedule.every(int(config.anons('time'))).seconds.do(announce)
while True:
await schedule.run_pending()
await asyncio.sleep(5)

10
utils/bot_commands.py Normal file
View File

@ -0,0 +1,10 @@
from aiogram import types
async def set_commands(dp):
await dp.bot.set_my_commands([
types.BotCommand("start", "получить список замен"),
types.BotCommand("help", "информация"),
types.BotCommand("link", "получить ссылку на файл"),
types.BotCommand("reload", "только для администрации"),
])