This commit is contained in:
tema 2022-03-18 21:25:50 +02:00
commit 2baff4118e
Signed by: tema
GPG Key ID: 21FDB6D162488F6F
28 changed files with 573 additions and 0 deletions

14
.gitignore vendored Normal file
View File

@ -0,0 +1,14 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
venv/
*.txt
!requirements.txt
*.ini
!example_config.ini
test.py

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# clash-telegram-bot
Бот для верификации юзеров. Проверка состояния в клане clash of clans по средствам clash of clans API

43
bot.py Normal file
View File

@ -0,0 +1,43 @@
import logging
from aiogram import executor
from load import dp, config
import filters
import handlers
skip_updates=True
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
async def on_startup(dp):
webhook_url = "{host}{path}{token}".format(
host=config["WebHook"]["webhook_host"],
path=config["WebHook"]["webhook_path"],
token=config["Bot"]["token"]
)
await dp.bot.set_webhook(url=webhook_url)
async def on_shutdown(dp):
await dp.bot.delete_webhook()
if __name__ == "__main__":
if config["WebHook"]["use"].lower() in ["yes", "true"]:
executor.start_webhook(
dispatcher=dp,
skip_updates=skip_updates,
webhook_path="{}{}".format(config["WebHook"]["webhook_path"], config["Bot"]["token"]),
on_startup=on_startup,
on_shutdown=on_shutdown,
host=config["WebHook"]["app_host"],
port=config["WebHook"]["app_port"]
)
else:
executor.start_polling(dp, skip_updates=skip_updates)

6
coc/__init__.py Normal file
View File

@ -0,0 +1,6 @@
import imp
from .user import Player
from .clan import Clan
class ClashOfClans(Player, Clan):
pass

26
coc/base.py Normal file
View File

@ -0,0 +1,26 @@
import json
import requests
from abc import ABC
class Base(ABC):
def __init__(self, api_token:str, clan_tag:str = None) -> None:
self.headers = {
"Content-Type": "application/json",
"Authorization": "Bearer {api_token}".format(api_token=api_token)
}
self.clan_tag = clan_tag
def get(self, url: str, tag: str):
request = requests.get(
url.format(tag.replace("#", "")),
headers=self.headers
)
return json.loads(request.text)
def post(self, url: str, tag: str, payload: dict):
request = requests.post(
url=url.format(tag.replace("#", "")),
headers=self.headers,
data=json.dumps(payload)
)
return json.loads(request.text)

17
coc/clan.py Normal file
View File

@ -0,0 +1,17 @@
from .base import Base
class Clan(Base):
def clan_members(self, clan_tag:str = None):
if self.clan_tag is not None:
clan_tag = self.clan_tag
url = "https://api.clashofclans.com/v1/clans/%23{}/members"
return self.get(url, tag=clan_tag)
def get_clan(self, clan_tag:str = None):
if self.clan_tag is not None:
clan_tag = self.clan_tag
url = "https://api.clashofclans.com/v1/clans/%23{}"
return self.get(url, tag=clan_tag)

12
coc/user.py Normal file
View File

@ -0,0 +1,12 @@
from .base import Base
class Player(Base):
def get_player(self, tag: str):
url = "https://api.clashofclans.com/v1/players/%23{}"
return self.get(url, tag)
def verify_token(self, tag: str, token: str):
payload = {"token":token}
url = "https://api.clashofclans.com/v1/players/%23{}/verifytoken"
return self.post(url, tag, payload)

16
config.py Normal file
View File

@ -0,0 +1,16 @@
from configparser import ConfigParser
CONFIG_FILE ='./config.ini'
data = ConfigParser()
data.read(CONFIG_FILE)
config = dict()
for section in data.sections():
config[section] = dict()
for key, value in data.items(section):
config[section][key] = value

BIN
database.sqlite3 Normal file

Binary file not shown.

18
database/model.py Normal file
View File

@ -0,0 +1,18 @@
from peewee import Model, BigIntegerField, CharField
from load import db
class Users(Model):
user_id = BigIntegerField(null=False)
first_name = CharField(64)
last_name = CharField(64, null=True)
username = CharField(32, null=True)
tag = CharField()
nickname = CharField()
townhall = BigIntegerField()
attackwins = BigIntegerField()
class Meta:
database = db

57
database/worker.py Normal file
View File

@ -0,0 +1,57 @@
from aiogram import types
from .model import db, Users
db.create_tables([Users])
def register(user: types.User, data: dict):
if not Users.select().where(Users.user_id==user.id, Users.tag == data['tag']).exists():
Users.create(
user_id = user.id,
first_name = user.first_name,
last_name = user.last_name,
username = user.username,
tag = data['tag'],
nickname = data['name'],
townhall = data['townHallLevel'],
attackwins = data['attackWins']
)
else:
Users.update(
user_id = user.id,
first_name = user.first_name,
last_name = user.last_name,
username = user.username,
tag = data['tag'],
nickname = data['name'],
townhall = data['townHallLevel'],
attackwins = data['attackWins']
).where(Users.user_id == user.id, Users.tag == data['tag']).execute()
def delete(user_id: int):
Users.delete().where(Users.user_id == user_id).execute()
def check_register(user_id: int):
return Users.select().where(Users.user_id==user_id).exists()
def get_users():
users = []
for p in Users.select():
users.append(
{
'user_id': p.user_id,
'first_name': p.first_name,
'last_name': p.last_name,
'username': p.username,
'tag': p.tag,
'nickname': p.nickname,
'townhall': p.townhall,
'attackwins': p.attackwins
}
)
return users

26
example_config.ini Normal file
View File

@ -0,0 +1,26 @@
[Bot]
token = 123456789:ABCDEABCDEABCD
;; Token from @botfather
base_server = https://api.telegram.org
chat_id = -1001740735953
verify_timeout = 120
[WebHook]
use = false
webhook_host = http://127.0.0.1:3001
webhook_path = /bot
app_host = 127.0.0.1
app_port = 3001
[DataBase]
link = sqlite:///database.sqlite3
;; http://docs.peewee-orm.com/en/latest/peewee/playhouse.html#database-url
[API]
api_file = key.txt
;; file with clash of clans API Token
;; https://developer.clashofclans.com
clan_tag = #QWERTY1

5
filters/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from load import dp
from .main import ChatFilter
dp.filters_factory.bind(ChatFilter)

15
filters/main.py Normal file
View File

@ -0,0 +1,15 @@
from aiogram.dispatcher.filters import BoundFilter
from aiogram import types
from load import config
class ChatFilter(BoundFilter):
key = 'is_chat'
def __init__(self, is_chat):
self.is_chat = is_chat
async def check(self, message: types.Message):
if message.chat.id == int(config["Bot"]["chat_id"]): return True
return False

5
fsm/base.py Normal file
View File

@ -0,0 +1,5 @@
from aiogram.dispatcher.filters.state import State, StatesGroup
class Verifycation(StatesGroup):
tag = State()
token = State()

2
handlers/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from . import group
from . import private

View File

@ -0,0 +1,3 @@
from . import main
from . import users
from . import captcha

52
handlers/group/captcha.py Normal file
View File

@ -0,0 +1,52 @@
from aiogram import types
from aiogram.types.chat_permissions import ChatPermissions
from load import dp, bot, config
from keyboards.inline.keyboard import verify_button
from utils.timer import timer_manager, KickTimer
async def captcha_kick_user(message: types.Message, chat_id: int, user_id:int):
print("asdasdasd")
try:
await bot.ban_chat_member(chat_id=chat_id, user_id=user_id)
except Exception: pass
await bot.unban_chat_member(chat_id, user_id, only_if_banned=True)
await message.delete()
try:
await bot.send_message(user_id, "Время вышло!")
except Exception: pass
state = dp.current_state(chat=user_id, user=user_id)
await state.finish()
@dp.message_handler(is_chat=True, content_types=["new_chat_members"])
async def on_join(message: types.Message):
user_id = message.from_user.id
chat_id = message.chat.id
try:
await bot.restrict_chat_member(
chat_id=chat_id,
user_id=user_id,
permissions=ChatPermissions(can_send_messages=False)
)
except Exception: return
hello_message = (
"Привет, <a href='tg://user?id={user_id}'>{name}</a>! "
"Для того чтобы удостоверится что ты наш игрок, мы попросим пройти верификацию!\n"
"<b>У вас есть 2 минуты на прохождение верификации!</b>"
)
me = await bot.get_me()
captcha_message = await bot.send_message(
message.chat.id,
hello_message.format(user_id=user_id, name=message.from_user.first_name),
reply_markup=verify_button(message.from_user, me.username)
)
timer_manager.add_timer(user_id, captcha_message, KickTimer(int(config['Bot']['verify_timeout']), captcha_kick_user,
args=[captcha_message, chat_id, user_id]))
#Timer(int(config['Bot']['verify_timeout']), captcha_kick_user, args=[captcha_message, chat_id, user_id])
await message.delete()

12
handlers/group/main.py Normal file
View File

@ -0,0 +1,12 @@
from aiogram import types
from load import dp, bot, config
from database.worker import delete
@dp.message_handler(is_chat=True, content_types=['left_chat_member'])
async def on_left(message: types.Message):
user_id = message.from_user.id
delete(user_id)
await message.delete()

18
handlers/group/users.py Normal file
View File

@ -0,0 +1,18 @@
from aiogram import types
from load import bot, dp
from database.worker import get_users
@dp.message_handler(is_chat=True, commands=['get'])
async def get_users_info(message: types.Message):
text = ''
users = get_users()
if users is not None:
for p in users:
text += ("<a href='tg://user?id={user_id}'>{first_name}</a> ({nickname} <code>{tag}</code>)\n").format(
user_id=p['user_id'], first_name=p['first_name'], nickname=p['nickname'], tag=p['tag']
)
await bot.send_message(message.chat.id, text)
else:
await bot.send_message(message.chat.id, "Ничего не найдено!")

View File

@ -0,0 +1 @@
from . import register

View File

@ -0,0 +1,114 @@
from aiogram import types
from aiogram.types.chat_permissions import ChatPermissions
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters import ChatTypeFilter
from load import dp, bot, coc_api, config
from fsm.base import Verifycation
from database.worker import register, check_register
from utils.timer import timer_manager
@dp.message_handler(ChatTypeFilter(['private']), commands=['start'], state='*')
async def verify(message: types.Message):
status = await bot.get_chat_member(config['Bot']["chat_id"], message.from_user.id)
if status["status"] == "left":
await bot.send_message(message.chat.id, "Пошёл нахуй! В чат заходить не учили?")
return
if check_register(message.from_user.id):
await bot.send_message(message.chat.id, "Вы уже прошли верификацию!")
return
state = dp.current_state(chat=message.chat.id, user=message.from_user.id)
await state.finish()
await bot.send_message(message.chat.id, "Отправь мне свой тег")
await Verifycation.tag.set()
@dp.message_handler(state=Verifycation.tag)
async def tag_set(message: types.Message, state: FSMContext):
await state.update_data(tag=message.text)
await bot.send_message(
message.chat.id,
("Отправь мне свой токен авторизации\n"
"! Владелец или кто-либо не получает доступа к вашему акаунту !\n"
"<a href='https://developer.clashofclans.com/#/documentation'>"
"Если хотите убедиться в этом, можете посмотреть официальную API, которую использует данный бот</a>")
)
await Verifycation.token.set()
@dp.message_handler(state=Verifycation.token)
async def token_set(message: types.Message, state: FSMContext):
chat_id = config['Bot']["chat_id"]
await state.update_data(token=message.text)
info = await state.get_data()
await state.finish()
data = coc_api.verify_token(**info)
#{'tag': '#TAG', 'token': 'TOKEN', 'status': 'ok'}
members = coc_api.clan_members()
clan_member = False
for i in members['items']:
if data['tag'] in i['tag']:
clan_member = True
break
if clan_member == False:
await bot.send_message(
message.chat.id,
("Вы не являетесь учасником нашего клана! Ввойдите в клан, чтобы иметь доступ к чату\n"
"Вы были кикнуты с чата, вы можете перезайти и пройти проверку повторно!")
)
try:
await bot.ban_chat_member(chat_id, message.from_user.id)
except Exception: pass
await bot.unban_chat_member(chat_id, message.from_user.id, only_if_banned=True)
return
if (data['status'] == 'ok'):
await bot.send_message(message.chat.id, "Вы прошли проверку!")
register(message.from_user, coc_api.get_player(data['tag']))
group = await bot.get_chat(chat_id)
group_permissions = group["permissions"]
permissions = ChatPermissions(
can_send_messages= group_permissions["can_send_messages"],
can_send_media_messages= group_permissions["can_send_media_messages"],
can_send_polls= group_permissions["can_send_polls"],
can_send_other_messages= group_permissions["can_send_other_messages"],
can_add_web_page_previews= group_permissions["can_add_web_page_previews"],
can_change_info= group_permissions["can_change_info"],
can_invite_users= group_permissions["can_invite_users"],
can_pin_messages= group_permissions["can_pin_messages"]
)
try:
await bot.restrict_chat_member(
chat_id=chat_id,
user_id=message.from_user.id,
permissions=permissions
)
except Exception: pass
else:
await bot.send_message(
message.chat.id,
("Вы ввели неверный токен или тег!\n"
"Вы были кикнуты с чата, вы можете перезайти и пройти проверку повторно!")
)
try:
await bot.ban_chat_member(chat_id, message.from_user.id)
except Exception: pass
await bot.unban_chat_member(chat_id, message.from_user.id, only_if_banned=True)
m = timer_manager.cancel_timer(message.from_user.id)
if m is not None:
await m.delete()

1
keyboards/__init__.py Normal file
View File

@ -0,0 +1 @@
from . import inline

View File

View File

@ -0,0 +1,18 @@
from aiogram import types
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
def verify_button(user: types.User, bot_username: str) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
"Подтвердить",
url="https://t.me/{bot}?start={user_id}".format(
bot=bot_username,
user_id=user.id
)
)
]
]
)

26
load.py Normal file
View File

@ -0,0 +1,26 @@
from aiogram import Dispatcher, Bot
from aiogram.bot.api import TelegramAPIServer
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from playhouse.db_url import connect
from config import config
from coc import ClashOfClans
bot = Bot(
token=config["Bot"]["token"],
server=TelegramAPIServer.from_base(config["Bot"]["base_server"]),
parse_mode='HTML'
)
storage = MemoryStorage()
dp = Dispatcher(bot=bot, storage=storage)
db = connect(url=config["DataBase"]["link"])
coc_api = ClashOfClans(
api_token=open(config["API"]["api_file"], "r").read(),
clan_tag=config["API"]["clan_tag"]
)

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
aiogram==2.19
requests==2.27.1
peewee==3.14.10

60
utils/timer.py Normal file
View File

@ -0,0 +1,60 @@
import asyncio
class KickTimer:
def __init__(self, timeout, callback, args):
self._timeout = timeout
self._callback = callback
self._args = args
self._task = None
async def _job(self):
await asyncio.sleep(self._timeout)
await self._callback(*self._args)
await asyncio.sleep(5)
self._task.cancel()
async def _run_now(self):
await self._callback(*self._args)
await asyncio.sleep(5)
self._task.cancel()
def start(self):
self._task = asyncio.ensure_future(self._job())
def execute_now(self):
self._task = asyncio.ensure_future(self._run_now())
def cancel(self):
self._task.cancel()
class TimerManager:
def __init__(self):
self.timers = {}
def add_timer(self, user_id: int, message, timer: KickTimer):
if user_id not in self.timers:
timer.start()
self.timers[user_id] = [timer, message]
def cancel_timer(self, user_id):
if user_id in self.timers:
self.timers[user_id][0].cancel()
message = self.timers[user_id][1]
del self.timers[user_id]
print(f"Timer for user {user_id} canceled")
return message
def run_now(self, user_id):
if user_id in self.timers:
self.timers[user_id][0].execute_now()
message = self.timers[user_id][1]
del self.timers[user_id]
return message
timer_manager = TimerManager()