-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
112 lines (88 loc) · 3.68 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#from sre_parse import CATEGORIES
from typing import Dict, List
import telegram
from amazon_api import search_items
from create_messages import create_item_html
import time
from datetime import datetime
import random
from consts import *
import logging
logging.basicConfig(level=logging.INFO)
# ****** Author: Paolo Francioso ********
def is_active() -> bool:
now = datetime.now().time()
return MIN_HOUR < now.hour < MAX_HOUR
def send_consecutive_messages(list_of_struct: List[str], number_of_messages = int) -> None:
for i in range(number_of_messages):
pointer_1 = 0 + i * 2
pointer_2 = 1 + i * 2
bot.send_message(
chat_id=CHANNEL_NAME,
text=list_of_struct[pointer_1],
reply_markup=list_of_struct[pointer_2],
parse_mode=telegram.ParseMode.HTML,
)
return_counter = pointer_2 + 1
return list_of_struct[return_counter:]
# run bot function
def run_bot(bot: telegram.Bot, categories: Dict[str, List[str]]) -> None:
min_result=NUMBER_OF_MESSAGES*2 - 1
res_except=NUMBER_OF_MESSAGES*2
# start loop
while True:
try:
items_full = []
# randomize categories and keywords
randomizer = len(categories) - 1
random_array = random.randint(0, randomizer)
counter = 0
# iterate over keywords
try:
for category in categories:
if counter == random_array:
# shuffle keywords
random.shuffle(categories[category])
for keyword in categories[category]:
# iterate over pages
for page in range(1, MAX_PAGE_SEARCH):
items = search_items(keyword, category, item_page=page)
# api time limit for another http request is 1 second
time.sleep(1)
if items is not None:
items_full.extend(items)
raise StopIteration
counter = counter + 1
except StopIteration: pass
logging.info(f'{5 * "*"} Requests Completed {5 * "*"}')
# shuffling results times
random.shuffle(items_full)
# creating html message, you can find more information in create_messages.py
res = create_item_html(items_full, False)
# while we have items in our list
while len(res) > min_result:
# if bot is active
if is_active():
try:
# Sending two consecutive messages
logging.info(f'{5 * "*"} Sending posts to channel {5 * "*"}')
res = send_consecutive_messages(res, NUMBER_OF_MESSAGES)
except Exception as e:
logging.info(e)
res = res[res_except:]
continue
# Sleep for PAUSE_MINUTES
time.sleep(60 * PAUSE_MINUTES)
else:
# if bot is not active
logging.info(
f'{5 * "*"} Inactive Bot, between {MIN_HOUR}AM and {MAX_HOUR}PM {5 * "*"}'
)
time.sleep(60 * 5)
except Exception as e:
logging.info(e)
break
if __name__ == "__main__":
# Create the bot instance
bot = telegram.Bot(token=TOKEN)
run_bot(bot=bot, categories=CATEGORIES)