Fixed first database creation, added default values for some get's
This commit is contained in:
@@ -15,7 +15,7 @@ async def on_global_settings_kb_open(call: types.CallbackQuery):
|
||||
|
||||
|
||||
@wrap_exception()
|
||||
@throttle(cooldown=60*60, admin_ids=db[DBTables.config].get('admins'), by_id=False)
|
||||
@throttle(cooldown=60*60, admin_ids=db[DBTables.config].get('admins', []), by_id=False)
|
||||
async def on_set_model(call: types.CallbackQuery):
|
||||
from bot.keyboards.set_model import get_set_model_keyboard
|
||||
from bot.modules.api.models import get_models
|
||||
|
||||
@@ -17,9 +17,15 @@ fernet = Fernet(
|
||||
)
|
||||
|
||||
|
||||
def encrypt(s: str) -> bytes:
|
||||
return fernet.encrypt(s.encode())
|
||||
def encrypt(s: str) -> bytes | None:
|
||||
try:
|
||||
return fernet.encrypt(s.encode())
|
||||
except TypeError:
|
||||
return None
|
||||
|
||||
|
||||
def decrypt(s: bytes) -> str:
|
||||
return fernet.decrypt(s).decode()
|
||||
def decrypt(s: bytes) -> str | None:
|
||||
try:
|
||||
return fernet.decrypt(s).decode()
|
||||
except TypeError:
|
||||
return None
|
||||
|
||||
@@ -7,7 +7,7 @@ from bot.utils.cooldown import throttle
|
||||
|
||||
@throttle(5)
|
||||
async def set_endpoint(message: types.Message, is_command: bool = True):
|
||||
if message.from_id not in db[DBTables.config].get('admins') and message.from_id != ADMIN:
|
||||
if message.from_id not in db[DBTables.config].get('admins', []) and message.from_id != ADMIN:
|
||||
await message.reply('❌ You are not permitted to do that. '
|
||||
'It is only for this bot instance maintainers and admins')
|
||||
return
|
||||
@@ -46,8 +46,8 @@ async def add_whitelist(message: types.Message, is_command: bool = True):
|
||||
if not isinstance(db[DBTables.config].get('whitelist'), list):
|
||||
db[DBTables.config]['whitelist'] = list()
|
||||
|
||||
if ID not in db[DBTables.config].get('whitelist'):
|
||||
whitelist_ = db[DBTables.config].get('whitelist')
|
||||
if ID not in db[DBTables.config].get('whitelist', []):
|
||||
whitelist_ = db[DBTables.config].get('whitelist', [])
|
||||
whitelist_.append(ID)
|
||||
db[DBTables.config]['whitelist'] = whitelist_
|
||||
else:
|
||||
@@ -81,11 +81,11 @@ async def remove_whitelist(message: types.Message, is_command: bool = True):
|
||||
if not isinstance(db[DBTables.config].get('whitelist'), list):
|
||||
db[DBTables.config]['whitelist'] = list()
|
||||
|
||||
if ID not in db[DBTables.config].get('whitelist'):
|
||||
if ID not in db[DBTables.config].get('whitelist', []):
|
||||
await message.reply('❌ This whitelist is not added')
|
||||
return
|
||||
else:
|
||||
whitelist_ = db[DBTables.config].get('whitelist')
|
||||
whitelist_ = db[DBTables.config].get('whitelist', [])
|
||||
whitelist_.remove(ID)
|
||||
db[DBTables.config]['whitelist'] = whitelist_
|
||||
|
||||
@@ -101,7 +101,7 @@ async def get_whitelist(message: types.Message, is_command: bool = True):
|
||||
return
|
||||
|
||||
await message.reply(f"✅ Whitelisted ids: {db[DBTables.config].get('whitelist')}"
|
||||
if db[DBTables.config].get('whitelist') else
|
||||
if db[DBTables.config].get('whitelist', []) else
|
||||
'❌ Whitelist is disabled. Everyone can use the bot. Add someone to whitelist to enable it')
|
||||
|
||||
|
||||
@@ -123,8 +123,8 @@ async def add_admin(message: types.Message, is_command: bool = True):
|
||||
if not isinstance(db[DBTables.config].get('admins'), list):
|
||||
db[DBTables.config]['admins'] = list()
|
||||
|
||||
if ID not in db[DBTables.config].get('admins'):
|
||||
admins_ = db[DBTables.config].get('admins')
|
||||
if ID not in db[DBTables.config].get('admins', []):
|
||||
admins_ = db[DBTables.config].get('admins', [])
|
||||
admins_.append(ID)
|
||||
db[DBTables.config]['admins'] = admins_
|
||||
else:
|
||||
@@ -154,11 +154,11 @@ async def remove_admin(message: types.Message, is_command: bool = True):
|
||||
if not isinstance(db[DBTables.config].get('admins'), list):
|
||||
db[DBTables.config]['admins'] = list()
|
||||
|
||||
if ID not in db[DBTables.config].get('admins'):
|
||||
if ID not in db[DBTables.config].get('admins', []):
|
||||
await message.reply('❌ This admin is not added')
|
||||
return
|
||||
else:
|
||||
admins_ = db[DBTables.config].get('admins')
|
||||
admins_ = db[DBTables.config].get('admins', [])
|
||||
admins_.remove(ID)
|
||||
db[DBTables.config]['admins'] = admins_
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from bot.utils.cooldown import throttle
|
||||
|
||||
@throttle(5)
|
||||
async def on_off_call(message: types.Message, is_command=None):
|
||||
if message.from_id not in db[DBTables.config].get('admins') and message.from_id != ADMIN:
|
||||
if message.from_id not in db[DBTables.config].get('admins', []) and message.from_id != ADMIN:
|
||||
await message.reply('❌ You are not permitted to do that. '
|
||||
'It is only for this bot instance maintainers and admins')
|
||||
return
|
||||
|
||||
@@ -6,7 +6,7 @@ from bot.utils.cooldown import throttle
|
||||
|
||||
@throttle(5)
|
||||
async def resetqueue(message: types.Message, is_command: bool = True):
|
||||
if message.from_id not in db[DBTables.config].get('admins') and message.from_id != ADMIN:
|
||||
if message.from_id not in db[DBTables.config].get('admins', []) and message.from_id != ADMIN:
|
||||
await message.reply('❌ You are not permitted to do that. '
|
||||
'It is only for this bot instance maintainers and admins')
|
||||
return
|
||||
|
||||
@@ -6,7 +6,7 @@ from bot.modules.get_hash.get_hash import get_hash
|
||||
|
||||
|
||||
@wrap_exception([IndexError])
|
||||
@throttle(cooldown=5, admin_ids=db[DBTables.config].get('admins'))
|
||||
@throttle(cooldown=5, admin_ids=db[DBTables.config].get('admins', []))
|
||||
async def hash_command(message: types.Message):
|
||||
try:
|
||||
if not hasattr(message.reply_to_message, 'photo') or not hasattr(message.reply_to_message, 'document'):
|
||||
|
||||
@@ -6,6 +6,6 @@ from bot.keyboards.config import get_config_keyboard
|
||||
|
||||
|
||||
@wrap_exception()
|
||||
@throttle(cooldown=20, admin_ids=db[DBTables.config].get('admins'), by_id=False)
|
||||
@throttle(cooldown=20, admin_ids=db[DBTables.config].get('admins', []), by_id=False)
|
||||
async def config_command(message: types.Message):
|
||||
await message.reply("⚙️ Configuration:", reply_markup=get_config_keyboard(message.from_id))
|
||||
|
||||
@@ -6,7 +6,7 @@ from bot.utils.errorable_command import wrap_exception
|
||||
|
||||
|
||||
@wrap_exception([IndexError])
|
||||
@throttle(cooldown=10, admin_ids=db[DBTables.config].get('admins'))
|
||||
@throttle(cooldown=10, admin_ids=db[DBTables.config].get('admins', []))
|
||||
async def imginfo(message: types.Message):
|
||||
try:
|
||||
if not hasattr(message.reply_to_message, 'photo'):
|
||||
|
||||
@@ -11,20 +11,13 @@ async def start_command(message: types.Message):
|
||||
f'so we will check config for you now')
|
||||
if not isinstance(db[DBTables.config].get('admins'), list):
|
||||
db[DBTables.config]['admins'] = list()
|
||||
if ADMIN not in db[DBTables.config].get('admins'):
|
||||
admins_ = db[DBTables.config].get('admins')
|
||||
if ADMIN not in db[DBTables.config].get('admins', []):
|
||||
admins_ = db[DBTables.config].get('admins', [])
|
||||
admins_.append(ADMIN)
|
||||
db[DBTables.config]['admins'] = admins_
|
||||
await db[DBTables.config].write()
|
||||
await message.reply(f'✅ Added {message.from_user.username} to admins. You can add other admins, '
|
||||
f'check bot settings menu')
|
||||
if ADMIN not in db[DBTables.config].get('whitelist'):
|
||||
whitelist_ = db[DBTables.config].get('whitelist')
|
||||
whitelist_.append(ADMIN)
|
||||
db[DBTables.config]['whitelist'] = whitelist_
|
||||
await db[DBTables.config].write()
|
||||
await message.reply(f'✅ Added {message.from_user.username} to whitelist. You can add other users to whitelist, '
|
||||
f'check bot settings menu')
|
||||
if db[DBTables.config].get('enabled') is None:
|
||||
db[DBTables.config]['enabled'] = True
|
||||
await message.reply(f'✅ Generation is enabled now')
|
||||
|
||||
@@ -6,7 +6,7 @@ from bot.utils.errorable_command import wrap_exception
|
||||
|
||||
|
||||
@wrap_exception()
|
||||
@throttle(cooldown=5, admin_ids=db[DBTables.config].get('admins'))
|
||||
@throttle(cooldown=5, admin_ids=db[DBTables.config].get('admins', []))
|
||||
async def get_current(message: types.Message):
|
||||
prompt: Prompt = db[DBTables.prompts].get(message.from_id)
|
||||
if prompt is None:
|
||||
|
||||
@@ -7,7 +7,7 @@ from bot.utils.errorable_command import wrap_exception
|
||||
|
||||
|
||||
@wrap_exception()
|
||||
@throttle(cooldown=5*60, admin_ids=db[DBTables.config].get('admins'), by_id=False)
|
||||
@throttle(cooldown=5*60, admin_ids=db[DBTables.config].get('admins', []), by_id=False)
|
||||
async def set_model_command(message: types.Message):
|
||||
if db[DBTables.config].get('whitelist') and \
|
||||
(message.chat.id not in db[DBTables.config]['whitelist'] and
|
||||
|
||||
@@ -8,7 +8,7 @@ def get_config_keyboard(user_id: int) -> types.InlineKeyboardMarkup:
|
||||
types.InlineKeyboardButton("Prompt settings", callback_data="prompt_settings_kb"),
|
||||
types.InlineKeyboardButton("Global settings", callback_data="global_settings_kb")
|
||||
]
|
||||
if user_id in db[DBTables.config].get('admins'):
|
||||
if user_id in db[DBTables.config].get('admins', []):
|
||||
buttons.append(
|
||||
types.InlineKeyboardButton("Admin settings", callback_data="admin_settings_kb")
|
||||
)
|
||||
|
||||
@@ -4,6 +4,8 @@ from bot.db import db, DBTables, decrypt
|
||||
|
||||
async def ping():
|
||||
endpoint = decrypt(db[DBTables.config].get('endpoint'))
|
||||
if endpoint is None:
|
||||
return False
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
r = await session.head(endpoint)
|
||||
|
||||
Reference in New Issue
Block a user