-
Notifications
You must be signed in to change notification settings - Fork 0
/
callmanager.py
346 lines (313 loc) · 16.5 KB
/
callmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from pyrogram import Client
from pyrogram.errors.exceptions import BotMethodInvalid
from pyrogram.errors.exceptions.bad_request_400 import ChannelInvalid, ChannelPrivate, InviteHashExpired, PeerIdInvalid, UserAlreadyParticipant
from pyrogram.errors.exceptions.flood_420 import FloodWait
from pytgcalls import GroupCallFactory
import time
import os
from asyncio import QueueEmpty
import asyncio
from helpers.queues import queues
from utils.Logger import *
from utils.Config import Config
from utils.MongoClient import MongoDBClient
from pyrogram.raw.functions.phone import LeaveGroupCall
from pyrogram.raw.functions.channels import GetFullChannel
config = Config()
MongoDBClient = MongoDBClient()
user_app = Client(config.get('USERBOT_SESSION'), api_id=config.get(
'API_ID'), api_hash=config.get('API_HASH'))
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(
Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class GoupCallInstance(object):
def __init__(self, chat_id, mongo_doc, client=None):
self.client = user_app
self.bot_client = client
self.mongo_doc = mongo_doc
self.chat_id = chat_id
self.pytgcalls = GroupCallFactory(
self.client, GroupCallFactory.MTPROTO_CLIENT_TYPE.PYROGRAM).get_file_group_call()
self.active = False
self.status = None
self.songs = []
self.playingFile = None
self.repeatCount = 1
self.currentRepeatCount = 0
self.logInfo = lambda msg: logInfo(f"{self.chat_id}=>{msg}")
self.logWarn = lambda msg: logWarning(f"{self.chat_id}=>{msg}")
self.logException = lambda msg, send: logException(
f"{self.chat_id}=>{msg}", send)
@self.pytgcalls.on_playout_ended
async def on_stream_end(context, *args) -> None:
try:
chat_id = context.full_chat.id
logInfo(
f"Playout berakhir, melompat ke lagu berikutnya, file saat ini yang berakhir : {args}")
self.currentRepeatCount = self.currentRepeatCount + 1
if self.repeatCount > 1 and self.currentRepeatCount < self.repeatCount:
logInfo(
f"Memulai ulang permainan : Loop : {self.repeatCount} ,Current Count : {self.currentRepeatCount}")
self.pytgcalls.restart_playout()
return
await self.skipPlayBack()
except Exception as ex:
logException(f"Error in on_stream_end: {ex}", True)
@self.pytgcalls.on_network_status_changed
async def on_network_changed(context, is_connected: bool):
try:
logInfo(f"berubah status menjadi : {is_connected}")
chat_id = context.full_chat.id
if is_connected is True:
self.active = True
self.status = "bermain"
else:
await self.stopPlayBack(fromHandler=True)
logInfo(f"statusnya berubah : {chat_id} {is_connected}")
except Exception as ex:
logException(f"Kesalahan di on_network_changed : {ex}", True)
async def changeFile(self, fileName, songInfo, requester, oldFileName=None):
try:
self.currentRepeatCount = 0
self.playingFile = fileName
self.pytgcalls.input_filename = fileName
try:
if oldFileName is not None and os.path.exists(oldFileName):
os.remove(oldFileName)
except Exception as ex:
self.logException(
f"Kesalahan saat menghapus file{oldFileName} : {ex}", True)
if MongoDBClient.client is not None:
MongoDBClient.add_song_playbacks(
songInfo, requester, self.mongo_doc['_id'])
self.logInfo(f"Changed file to : {fileName}")
except Exception as ex:
logException(f"Error in changeFile : {ex}", True)
raise Exception(ex)
def is_connected(self):
return self.pytgcalls.is_connected
async def getCurrentCall(self):
self.logInfo(f"Melakukan panggilan untuk mendapatkan panggilan saat ini dalam obrolan.")
return await self.pytgcalls.get_group_call(self.chat_id)
async def preCheck(self, botClient, useClient):
try:
await botClient.get_chat_member(
chat_id=self.chat_id,
user_id=config.get('HELPER_ACT_ID')
)
return True
except PeerIdInvalid:
return True
except Exception as ex:
logWarning(f"Error while checkign if helper act is present : {ex}")
try:
invitelink = await botClient.export_chat_invite_link(self.chat_id)
await asyncio.sleep(0.0)
except Exception as ex:
logWarning(
f"Bisa mengabaikan ini => export_chat_invite_link : {self.chat_id} {ex}")
return f"🕵️♀️**Ups, tambahkan bot sebagai admin di obrolan dan berikan semua izin.**"
try:
await useClient.join_chat(invitelink)
return True
except UserAlreadyParticipant:
return True
except InviteHashExpired as ie:
logWarning(
f"Can ignore this => User is denied : {self.chat_id} {ie}")
return f"__Error saat menambahkan akun Asisten `[` {config.get('HELPER_ACT')} `]` in chat.__\n\n**Pastikan akun Asisten tidak ada dalam daftar pengguna yang dihapus.__**"
except FloodWait as fe:
logWarning(
f"Can ignore this => User is denied : {self.chat_id} {fe}")
return f"__Flood Wait Error saat menambahkan akun Asisten `[` {config.get('HELPER_ACT')} `]` dalam obrolan.__\n\n**__please wait {fe.x} detik atau tambahkan akun Asisten secara manual.__**"
except Exception as ex:
logWarning(
f"Can ignore this => Error : {self.chat_id} {ex}")
return f"__Error while adding helper account `[` {config.get('HELPER_ACT')} `]` in chat.__\n\n**__{ex}__**"
async def start_playback(self, fileName, songInfo, requested_by):
try:
self.logInfo(
f"Memulai pemutaran di obrolan: antrian lagu saat ini : {self.songs}")
try:
await self.pytgcalls.start(self.chat_id)
except Exception as e:
return f"✖️ **Error while starting the playback:** __{e}__"
self.songs.append(
{"file": fileName, "song": songInfo, "requested_by": requested_by, "time": time.time()})
self.status = "playing"
self.active = True
return True
except Exception as ex:
self.logException(f"Error while starting the playback: {ex}", True)
return f"**__{ex}__**\n\nSilakan tambahkan akun Asisten `[` {config.get('HELPER_ACT')} `]` dalam obrolan ini dan kemudian kirim perintah lagi.__"
async def addSongsInQueue(self, fileName, songInfo, requested_by):
try:
self.logInfo(
f"Menambahkan lagu ke antrian : {songInfo} , {fileName}")
if queues.size(self.chat_id) >= config.get('PLAYLIST_SIZE'):
return True, f"✖️**__Saat ini paling banyak {config.get('PLAYLIST_SIZE')} lagu dapat ditambahkan dalam antrian. Silakan coba lagi setelah beberapa lagu berakhir.__**"
await queues.put(self.chat_id, file=fileName, song=songInfo, requested_by=requested_by)
self.songs.append(
{"file": fileName, "song": songInfo, "requested_by": requested_by, "time": time.time()})
req_by = f"[{requested_by['title']}](tg://user?id={requested_by['chat_id']})"
return True, f"**✅ Ditambahkan ke antrian.**\n**Name:** `{(songInfo['title'].strip())[:20]}`\n**Peminta:** {req_by}"
except Exception as ex:
self.logException(f"Error while addSongsInQueue: {ex}", True)
return False, "**__Error saat menambahkan lagu dalam antrian : {ex}.__**"
async def skipPlayBack(self, fromCommand=False):
try:
self.logInfo(
f"Skipping the playback : from handler : {not fromCommand} ")
queues.task_done(self.chat_id)
if queues.is_empty(self.chat_id) is True or len(self.songs) == 0:
if fromCommand is False:
await self.stopPlayBack()
return False, f"**__😬 Tidak ada lagu yang menunggu dalam antrian, Jika Anda ingin berhenti mengirim /stop.__**"
else:
old_file = self.songs[0].get('file')
self.songs.pop(0)
new_song = queues.get(self.chat_id)
await self.changeFile(
new_song["file"], new_song["song"], new_song['requested_by'], old_file)
return True, f"**__Berhasil melewatkan lagu.__**\n\n**Sedang dimainkan ▶️** `{new_song['song']['title']}`"
except Exception as ex:
self.logException(f"Kesalahan saat melewatkanPlayBack: {ex}", True)
return False, f"**__Kesalahan saat melewatkan : {ex}__**"
async def stopPlayBack(self, fromHandler=False, sendMessage=False, force=False):
try:
self.logInfo(
f"Menghentikan pemutaran : fromHandler : {fromHandler} ")
# delete all downloaded songs in queue
for s in self.songs:
try:
if s.get("file") is not None and os.path.exists(s.get("file")):
os.remove(s.get("file"))
except Exception as ex:
self.logException(
f"Error while remving the file : {s.get('file')}", True)
self.songs = []
self.active = False
self.status = 'stopped'
try:
queues.clear(self.chat_id)
except QueueEmpty:
pass
if fromHandler is False:
try:
await self.pytgcalls.leave_current_group_call()
except Exception as ex:
logWarning(
f"Can be ignored : leave_current_group_call :{ex} , {self.chat_id}")
finally:
await asyncio.sleep(0.1)
try:
if self.is_connected() is True:
await self.pytgcalls.stop()
except Exception as ex:
logWarning(
f"Can be ignored : pytgcalls.stop :{ex} , {self.chat_id}")
finally:
# await asyncio.sleep(0.1)
pass
if force is True:
try:
input_peer = await self.client.resolve_peer(self.chat_id)
chat = await self.client.send(GetFullChannel(channel=input_peer))
leave_call = LeaveGroupCall(call=chat.full_chat.call,
source=506)
await self.client.send(leave_call)
except Exception as ex:
logWarning(f"Gagal memaksa pergi :{ex}")
await asyncio.sleep(0.1)
resp_message = "**Pemutaran berakhir dan terima kasih 🙏🏻 telah mencoba dan menguji layanan.**\n__Berikan tanggapan/saran Anda @xskull7.__"
if sendMessage is True and self.bot_client is not None:
resp_message = "**Pemutaran berakhir `[Jika Anda berada di tengah-tengah sebuah lagu dan Anda mendapatkan pesan ini, maka ini terjadi karena penerapan. Anda dapat bermain lagi setelah beberapa waktu.]`**\n\n__Terima kasih telah mencoba dan memberikan umpan balik / saran Anda @xskull7.__"
await self.bot_client.send_message(self.chat_id, f"{resp_message}")
return True, resp_message
except BotMethodInvalid as bi:
self.logWarn(f"Kesalahan yang diharapkan saat menghentikan pemutaran : {bi}")
return False, f"**__Error while stopping : {bi}__**"
except Exception as ex:
self.logException(f"Error while stopping the playback: {ex}", True)
return False, f"**__Error while stopping : {ex}__**"
async def getCurrentInfo(self, userId):
try:
self.logInfo(
f"Mendapatkan info terkini : {self.status}")
info = f"**Aktif [ {self.chat_id} ] :** `{self.active}` {'**[** `'+self.songs[0].get('song').get('title')+'` **]**' if len(self.songs)>0 else ''}"
info = info + \
f"\n**Loop:** `{self.repeatCount if self.repeatCount>1 else 'OFF'}`"
isEmpty = queues.is_empty(self.chat_id)
if isEmpty is True or len(self.songs) == 0:
info = info + f"\n\n__Tidak ada lagu dalam antrian.__"
else:
info = info + f"\n\n**Mendatang:**\n"
sliced_songs = self.songs[1:]
for i in range(len(sliced_songs)):
info = info + \
f"**{i+1}.** `{sliced_songs[i].get('song').get('title')}`\n"
if userId in [i['chat_id'] for i in config.get('GLOBAL_ADMINS')]:
info = info + "\n\n**Tampilan Admin:**"
music_player = MusicPlayer()
count = 1
for chat_id, gc_instance in music_player.group_calls.items():
info = info + \
f"\n{count}. {chat_id} - **Aktif:** `{gc_instance.active}` **Status:** `{gc_instance.status}`"
info = info + \
f" **Antri:** `{len(gc_instance.songs) if gc_instance.songs is not None else None }` , **Loop:** `{self.repeatCount if self.repeatCount>1 else 'OFF'}`"
count = count+1
return True, f"{info}"
except Exception as ex:
self.logException(f"Error in getCurrentInfo : {ex}", True)
return False, f"**__Error while fetching the info : {ex}__**"
class MusicPlayer(metaclass=Singleton):
def __init__(self):
self.group_calls = {}
self.SIMULTANEOUS_CALLS = config.get('SIMULTANEOUS_CALLS')
def cleanTheGroupCallDict(self):
try:
new_gc = {}
for chat_id, gc_instance in self.group_calls.items():
if gc_instance.active is True:
new_gc[chat_id] = gc_instance
logInfo(
f"cleanTheGroupCallDict : New {len(new_gc)} , old : {len(self.group_calls)}")
self.group_calls = new_gc
except Exception as ex:
logException(f"Error in cleanTheGroupCallDict {ex}", True)
def createGroupCallInstance(self, chat_id, mongo_doc, client, force=False):
try:
logInfo(
f"Call for Creating new group call instance : {chat_id} {len(self.group_calls)}")
self.cleanTheGroupCallDict()
if self.group_calls.get(chat_id) is not None:
logInfo(f"GroupCall Instance already exists.")
return self.group_calls.get(chat_id), ""
else:
# check if it can be created
if chat_id not in config.get('SUDO_CHAT') and len(self.group_calls) >= self.SIMULTANEOUS_CALLS and force is False:
return None, f"**__😖 Maaf tapi saat ini layanan sedang digunakan di `{len(self.group_calls)}` groups/channels dan saat ini karena kurangnya sumber daya yang kami dukung secara maksimal `{self.SIMULTANEOUS_CALLS}` simultaneous playbacks.__**"
logInfo(
f"Creating new group call instance : {chat_id} {len(self.group_calls)}")
gc = GoupCallInstance(chat_id, mongo_doc, client)
self.group_calls[chat_id] = gc
return gc, ""
except Exception as ex:
logException(f"Error while getting group call {ex}", True)
return None, "**__🙄 Kesalahan Tak Terduga, Yakinlah pikiran terbaik kami telah diberitahu dan mereka sedang mengerjakannya.__**"
async def shutdown(self):
try:
for chat_id, gc in self.group_calls.items():
try:
if gc is not None and gc.active is True:
await gc.stopPlayBack(False, True)
except Exception as ex:
logException(
f"Error while shutting down {chat_id}, {ex}", True)
except Exception as ex:
logException(
f"Error while shutting down all instances of music player {ex}", True)