-
Notifications
You must be signed in to change notification settings - Fork 0
/
skyalert-cmds.py
483 lines (419 loc) · 23.8 KB
/
skyalert-cmds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
from atproto import Client, IdResolver, models, SessionEvent
import atproto_client
import json
import atproto_client.exceptions
import yaml
import os
import datetime
import time
import re
import tenacity
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
CONFIG_FILE = os.path.join(DATA_DIR, 'config.yaml')
CACHE_DIR = os.path.join(DATA_DIR, 'cache')
LAST_RUN_FILE = os.path.join(DATA_DIR, 'last_run.txt')
VERBOSE_PRINTING = False
global client
client = Client()
@client.on_session_change
def on_session_change(event: atproto_client.SessionEvent,session: atproto_client.Session):
if event==SessionEvent.CREATE or event==SessionEvent.REFRESH:
with open(os.path.join(DATA_DIR, 'login-info.yaml'), 'r') as f1:
login_info = yaml.safe_load(f1)
with open(os.path.join(DATA_DIR, 'login-info.yaml'), 'w') as f2:
new_login_info = {
'username': login_info['username'],
'password': login_info['password'],
'session-key-firehose': session.export(),
'session-key-cmds': login_info['session-key-cmds'] if 'session-key-cmds' in login_info else ""
}
yaml.dump(new_login_info, f2)
with open(os.path.join(DATA_DIR, 'login-info.yaml'), 'r') as f:
login_info = yaml.safe_load(f)
if 'session-key-firehose' in login_info and login_info['session-key-firehose']:
client.login(session_string=login_info['session-key-firehose'])
else:
client.login(login=login_info['username'], password=login_info['password'])
# Link detection by latchk3y on the Bluesky API Discord server
def get_facets(text):
pattern = r'(https?://[^\s]+)'
links = re.findall(pattern, text)
facets = []
for link in links:
start_index = text.index(link)
end_index = start_index + len(link)
facets.append({
"index": {
"byteStart": start_index,
"byteEnd": end_index },
"features": [
{
"$type": "app.bsky.richtext.facet#link",
"uri": link
}
]
})
if not facets:
return None
return facets
def get_config():
if not os.path.exists(CONFIG_FILE):
return {'user_watches': [], 'follow_watches': []}
with open(CONFIG_FILE, 'r') as f:
config = yaml.safe_load(f)
return config
def save_config(config):
with open(CONFIG_FILE, 'w') as f:
yaml.dump(config, f)
def get_last_run():
if not os.path.exists(LAST_RUN_FILE):
return None
with open(LAST_RUN_FILE, 'r') as f:
return datetime.datetime.fromisoformat(f.read())
def save_last_run():
with open(LAST_RUN_FILE, 'w') as f:
f.write(datetime.datetime.now(datetime.timezone.utc).isoformat())
def post_url_from_at_uri(at_uri):
# Split the AT URI to extract the DID and the random string
parts = at_uri.split('/')
did = parts[2]
random_string = parts[-1]
# Construct the Bluesky URL
url = f"https://bsky.app/profile/{did}/post/{random_string}"
return url
def bridgy_to_fed(handle: str):
if handle.endswith("ap.brid.gy"):
parts = handle.split('.')
if len(parts) >= 3:
username = parts[0]
domain = '.'.join(parts[1:-3])
return f"@{username}@{domain} (Bridgy)"
else:
return f"@{handle}"
else:
if handle.endswith("bsky.social"):
handle = handle[:-12] # Remove ".bsky.social" from the end
return handle
def fed_to_bridgy(handle: str):
if "@" in handle:
parts = handle.split('@')
if len(parts) >= 3: # If given a Fediverse handle, output the Bridgy handle
username = parts[1]
domain = '.'.join(parts[2:])
return f"{username}.{domain}.ap.brid.gy"
else: # If given a normal Bluesky handle, output the handle ensuring there is no @ at the beginning
return handle.lstrip('@')
else:
if not handle.endswith("bsky.social") and '.' not in handle:
handle += ".bsky.social"
return handle
def send_dm(to,message):
dm_client = client.with_bsky_chat_proxy()
dm = dm_client.chat.bsky.convo
# create resolver instance with in-memory cache
id_resolver = IdResolver()
# resolve DID
chat_to = to if "did:plc:" in to else id_resolver.handle.resolve(to)
# create or get conversation with chat_to
convo = dm.get_convo_for_members(
models.ChatBskyConvoGetConvoForMembers.Params(members=[chat_to, client.me.did]),
).convo
# send a message to the conversation
dm.send_message(
models.ChatBskyConvoSendMessage.Data(
convo_id=convo.id,
message=models.ChatBskyConvoDefs.MessageInput(
text=message,
facets=get_facets(message)
),
)
)
if VERBOSE_PRINTING: print('\nMessage sent!')
# logic for handling bot commands
def bot_commands_handler():
dm_client = client.with_bsky_chat_proxy()
dm = dm_client.chat.bsky.convo
if VERBOSE_PRINTING: print("Checking for bot commands...")
# check for commands sent to the bot
dmconvos_objs = dm.list_convos().model_dump()
dmconvos = []
dmconvos.extend(dmconvos_objs['convos'])
while True:
if "cursor" not in dmconvos_objs or dmconvos_objs["cursor"] is None:
break
dmconvos.extend(dmconvos_objs['convos'])
dmconvos_objs = dm.list_convos(cursor=dmconvos_objs['cursor']).model_dump()
for convo in dmconvos:
if convo['last_message']['sender']['did'] == client.me.did:
continue # The bot sent the last message, skip
else:
senderprofile = client.get_profile(convo['last_message']['sender']['did']).model_dump()
senderhandle = senderprofile['handle']
if convo['last_message']['text'].lower() == "!help":
if VERBOSE_PRINTING: print(f"Sending help message to {senderhandle}...")
message = "SkyAlert is a bot that can notify you about posts from people you watch or if someone unfollows you. To set up a watch, send me a DM with the following commands:\n\n!watch <subject> [reposts-allowed] - Watch a subject for new posts. You will be notified when the subject posts. If reposts-allowed is true, you will be notified on reposts.\n!unwatch <subject> - Stop watching a subject.\n!mywatches - List the subjects you are watching and the status of the follow watch feature.\n!repost-default <true/false> - Set the default reposts-allowed setting for new watches.\n!followwatch <true/false> - Enable or disable notifications for unfollows. You will be notified when someone unfollows you.\n!replies <true/false> - If this is true, you will see replies posted by the subjects you are watching."
send_dm(convo['last_message']['sender']['did'], message)
elif convo['last_message']['text'].lower().startswith("!watch"):
if VERBOSE_PRINTING: print(f"Processing watch command from {senderhandle}...")
parts = convo['last_message']['text'].split(' ')
if len(parts) < 2:
message = "Not enough arguments. Usage: !watch <subject> [reposts-allowed]"
send_dm(convo['last_message']['sender']['did'], message)
else:
config = get_config()
subject = fed_to_bridgy(parts[1])
reposts_allowed = False
if len(parts) == 3:
reposts_allowed = parts[2].lower() == "true"
else:
for entry in config.get('repost_defaults', []):
if entry['did'] == convo['last_message']['sender']['did']:
reposts_allowed = entry['reposts-allowed']
break
if not IdResolver().handle.resolve(subject):
message = ""
if subject.endswith("ap.brid.gy"):
message = f"Invalid subject handle. You entered a Fediverse or Bridgy Fed handle; that user may not be using Bridgy Fed."
elif "threads.net" in subject:
message = f"Invalid subject handle. Meta Threads users cannot follow Fediverse accounts yet, so Threads cannot work with Bridgy Fed."
elif not subject.endswith(".bsky.social"):
message = f"Invalid subject handle. Most Bluesky handles end in .bsky.social, try adding that."
else:
message = f"Invalid subject handle. There was no other reason the handle could be invalid, so that user likely does not exist or you made a typo."
send_dm(convo['last_message']['sender']['did'], message)
continue
config['user_watches'].append({'subject-handle': subject, 'receiver-handle': senderhandle, 'reposts-allowed': reposts_allowed, 'subject-did': IdResolver().handle.resolve(subject), 'receiver-did': convo['last_message']['sender']['did']})
save_config(config)
message = f"Watching {bridgy_to_fed(subject)} for new posts. Reposts allowed: {reposts_allowed}. You will be notified when the subject posts."
send_dm(convo['last_message']['sender']['did'], message)
elif convo['last_message']['text'].lower().startswith("!unwatch"):
if VERBOSE_PRINTING: print(f"Processing unwatch command from {senderhandle}...")
parts = convo['last_message']['text'].split(' ')
if len(parts) != 2 or parts[1] == "":
message = "Not enough arguments. Usage: !unwatch <subject>"
send_dm(convo['last_message']['sender']['did'], message)
else:
config = get_config()
subject_handle = fed_to_bridgy(parts[1])
receiver_handle = senderhandle
user_watches = config.get('user_watches', [])
new_user_watches = [watch for watch in user_watches if not (watch['subject-handle'] == subject_handle and watch['receiver-handle'] == receiver_handle)]
if len(user_watches) == len(new_user_watches):
message = f"No watch found for {bridgy_to_fed(subject_handle)}."
else:
config['user_watches'] = new_user_watches
save_config(config)
message = f"Stopped watching {bridgy_to_fed(subject_handle)}."
send_dm(convo['last_message']['sender']['did'], message)
elif convo['last_message']['text'].lower() == "!mywatches":
if VERBOSE_PRINTING: print(f"Processing mywatches command from {senderhandle}...")
config = get_config()
follow_watches = config.get('follow_watches', [])
user_watches = config.get('user_watches', [])
# Check follow watch status
follow_watch_status = "disabled"
for watch in follow_watches:
if watch['did'] == convo['last_message']['sender']['did']:
follow_watch_status = "enabled"
break
message = f"Follow watch notifications are {follow_watch_status}.\n\n"
# List user watches
user_watch_list = [watch for watch in user_watches if watch['receiver-handle'] == senderhandle]
if user_watch_list:
message += "You are watching the following subjects:\n"
lines = []
for watch in user_watch_list:
lines.append(f"- {bridgy_to_fed(watch['subject-handle'])} (Reposts allowed: {watch['reposts-allowed']})")
message += "\n".join(lines)
else:
message += "You are not watching any subjects."
send_dm(convo['last_message']['sender']['did'], message)
elif convo['last_message']['text'].lower().startswith("!repost-default"):
if VERBOSE_PRINTING: print(f"Processing repost-default command from {senderhandle}...")
parts = convo['last_message']['text'].split(' ')
if len(parts) != 2 or parts[1] == "":
current_default = next((entry['reposts-allowed'] for entry in get_config().get('repost_defaults', []) if entry['did'] == convo['last_message']['sender']['did']), None)
message = f"Not enough arguments. Usage: !repost-default <true/false>\nCurrent default setting: {current_default}"
send_dm(convo['last_message']['sender']['did'], message)
else:
config = get_config()
repost_default = parts[1].lower() == "true"
repost_defaults = config.get('repost_defaults', [])
if any(entry['did'] == convo['last_message']['sender']['did'] for entry in repost_defaults):
repost_defaults = [entry for entry in repost_defaults if entry['did'] != convo['last_message']['sender']['did']]
repost_defaults.append({'did': convo['last_message']['sender']['did'], 'reposts-allowed': repost_default})
config['repost_defaults'] = repost_defaults
save_config(config)
message = f"Default reposts-allowed setting set to {repost_default}."
send_dm(convo['last_message']['sender']['did'], message)
elif convo['last_message']['text'].lower().startswith("!followwatch"):
if VERBOSE_PRINTING: print(f"Processing followwatch command from {senderhandle}...")
parts = convo['last_message']['text'].split(' ')
if len(parts) != 2 or parts[1] == "":
message = "Not enough arguments. Usage: !followwatch <true/false>"
send_dm(convo['last_message']['sender']['did'], message)
else:
config = get_config()
followwatch = parts[1].lower() == "true"
follow_watches = config.get('follow_watches', [])
if followwatch:
if not any(watch['did'] == convo['last_message']['sender']['did'] for watch in follow_watches):
follow_watches.append({'did': convo['last_message']['sender']['did'], 'handle': senderhandle})
message = "Notifications enabled for unfollows."
else:
message = "Notifications already enabled for unfollows."
else:
if any(watch['did'] == convo['last_message']['sender']['did'] for watch in follow_watches):
follow_watches = [watch for watch in follow_watches if watch['did'] != convo['last_message']['sender']['did']]
message = "Notifications disabled for unfollows."
else:
message = "Notifications already disabled for unfollows."
config['follow_watches'] = follow_watches
save_config(config)
send_dm(convo['last_message']['sender']['did'], message)
elif convo['last_message']['text'].lower().startswith("!replies"):
if VERBOSE_PRINTING: print(f"Processing replies command from {senderhandle}...")
parts = convo['last_message']['text'].split(' ')
if len(parts) != 2 or parts[1] == "":
current_setting = next((entry['replies-allowed'] for entry in get_config().get('reply_settings', []) if entry['did'] == convo['last_message']['sender']['did']), None)
message = f"Not enough arguments. Usage: !replies <true/false>\nCurrent setting: {current_setting}"
send_dm(convo['last_message']['sender']['did'], message)
else:
config = get_config()
replies_allowed = parts[1].lower() == "true"
reply_settings = config.get('reply_settings', [])
if any(entry['did'] == convo['last_message']['sender']['did'] for entry in reply_settings):
reply_settings = [entry for entry in reply_settings if entry['did'] != convo['last_message']['sender']['did']]
reply_settings.append({'did': convo['last_message']['sender']['did'], 'replies-allowed': replies_allowed})
config['reply_settings'] = reply_settings
save_config(config)
message = f"Replies allowed setting set to {replies_allowed}."
send_dm(convo['last_message']['sender']['did'], message)
# main logic
def main():
# verify user watch validity
if VERBOSE_PRINTING: print("Checking user watches...")
for watch in get_config().get('user_watches'):
if VERBOSE_PRINTING: print(f"Checking watch for {watch.get('subject')} with receiver {watch.get('receiver')}...")
if VERBOSE_PRINTING: print("Verifying DIDs...")
receiver_did = watch.get('receiver-did')
subject_did = watch.get('subject-did')
receiver_handle = watch.get('receiver-handle')
subject_handle = watch.get('subject-handle')
if receiver_did == None or receiver_did == "":
if VERBOSE_PRINTING: print("Invalid receiver, all watches for this receiver will be removed...")
config = get_config()
config['user_watches'] = [w for w in config['user_watches'] if w['receiver'] != watch.get('receiver')]
save_config(config)
continue
if subject_did == None or subject_did == "":
if VERBOSE_PRINTING: print("Invalid subject, the watch will be removed...")
send_dm(receiver_did, f"You're no longer watching {watch.get('subject-handle')} because the handle is invalid.")
config = get_config()
config['user_watches'] = [w for w in config['user_watches'] if not (w['subject'] == watch.get('subject') and w['receiver'] == watch.get('receiver'))]
save_config(config)
continue
subject_profile = client.get_profile(subject_did).model_dump()
if subject_profile['handle'] != subject_handle:
config = get_config()
for watch in config['user_watches']:
if watch['subject-did'] == subject_did:
watch['subject-handle'] = subject_profile['handle']
save_config(config)
receiver_profile = client.get_profile(receiver_did).model_dump()
if receiver_profile['handle'] != receiver_handle:
config = get_config()
for watch in config['user_watches']:
if watch['receiver-did'] == receiver_did:
watch['receiver-handle'] = receiver_profile['handle']
save_config(config)
# logic for follow watches (user is notified when someone unfollows them)
# this does not need to be real-time, so it can run by polling
if VERBOSE_PRINTING: print("Checking follow watches...")
for user in get_config().get('follow_watches'):
if VERBOSE_PRINTING: print(f"Checking watch for {user}...")
if VERBOSE_PRINTING: print("Verifying DID...")
user_did = user['did']
user_handle = user['handle']
if user_did == None or user_did == "":
if VERBOSE_PRINTING: print("Invalid user, watch will be removed...")
config = get_config()
config['follow_watches'] = [w for w in config['follow_watches'] if w != user]
save_config(config)
continue
if VERBOSE_PRINTING: print("Loading cached followers...")
cached_followers = []
if not os.path.exists(f"{CACHE_DIR}/followers-{user_did}.yaml"):
with open(f"{CACHE_DIR}/followers-{user_did}.yaml", 'w') as f:
cached_followers = []
yaml.dump(cached_followers, f)
else:
with open(f"{CACHE_DIR}/followers-{user_did}.yaml", 'r') as f:
cached_followers = yaml.safe_load(f)
if VERBOSE_PRINTING: print("Pulling current followers...")
# Retrieve all current followers
current_followers_objs = []
current_followers_objs.append(client.get_followers(user_did).model_dump())
while True:
if "cursor" not in current_followers_objs[-1] or current_followers_objs[-1]["cursor"] is None:
break
current_followers_objs.append(client.get_followers(user_did, cursor=current_followers_objs[-1]['cursor']).model_dump())
if VERBOSE_PRINTING: print("Getting DIDs of current followers...")
# Separate the DIDs from the API objects
current_followers_dids = []
for obj in current_followers_objs:
for follower in obj['followers']:
current_followers_dids.append(follower['did'])
if VERBOSE_PRINTING: print("Checking for unfollows...")
# Check for unfollowers
unfollowed_dids = []
for cached_did in cached_followers:
if cached_did not in current_followers_dids:
unfollowed_dids.append(cached_did)
if unfollowed_dids:
message = "These users have unfollowed you:\n"
profile_lines = []
profile_fail = False
for did in unfollowed_dids:
try:
profile = client.get_profile(did).model_dump()
profile_lines.append(f"- {profile['handle']}")
except:
profile_lines.append(f"- {did}")
profile_fail = True
message += "\n".join(profile_lines)
if profile_fail: message += "\n\nSome profiles could not be loaded, so their handles are replaced by a DID. This usually happens when someone deletes their account or their account was suspended by the Bluesky team."
send_dm(user_did, message)
if VERBOSE_PRINTING: print("Saving follower cache...")
# Update the cached followers list
with open(f"{CACHE_DIR}/followers-{user_did}.yaml", 'w') as f:
yaml.dump(current_followers_dids, f)
# # last run time was only needed for user watching, so it is not needed anymore
# if VERBOSE_PRINTING: print("Saving last run time...")
# save_last_run()
@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=1, min=4, max=60), # Exponential backoff
stop=tenacity.stop_after_attempt(5), # Stop after 5 attempts
retry=tenacity.retry_if_exception_type(atproto_client.exceptions.RequestException)
)
def bot_commands_handler_with_retry():
bot_commands_handler()
@tenacity.retry(
wait=tenacity.wait_exponential(multiplier=1, min=4, max=60), # Exponential backoff
stop=tenacity.stop_after_attempt(5), # Stop after 5 attempts
retry=tenacity.retry_if_exception_type(atproto_client.exceptions.RequestException)
)
def main_with_retry():
main()
time_waited = 0
cmd_check_interval = 30
main_interval = 3600
while True:
if time_waited % cmd_check_interval == 0: # this script continues to handle bot commands
bot_commands_handler_with_retry()
if time_waited % main_interval == 0: # this only handles follow watches
main_with_retry()
if time_waited == main_interval:
time_waited = 0
time.sleep(1)
time_waited += 1