forked from FlxVctr/RADICES
-
Notifications
You must be signed in to change notification settings - Fork 0
/
start.py
170 lines (144 loc) · 6.95 KB
/
start.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import argparse
from datetime import datetime
import os
import time
import traceback
from shutil import copyfile
from sys import stderr, stdout
import pandas as pd
from collector import Coordinator
from setup import Config
def main_loop(coordinator, select=[], status_lang=None, test_fail=False, restart=False,
bootstrap=False, language_threshold=0, keywords=[]):
try:
latest_start_time = pd.read_sql_table('timetable', coordinator.dbh.engine)
latest_start_time = latest_start_time['latest_start_time'][0]
except ValueError:
latest_start_time = 0
if restart is True:
update_query = f"""
UPDATE friends
SET burned=0
WHERE UNIX_TIMESTAMP(timestamp) > {latest_start_time}
"""
coordinator.dbh.engine.execute(update_query)
start_time = time.time()
pd.DataFrame({'latest_start_time': [start_time]}).to_sql('timetable', coordinator.dbh.engine,
if_exists='replace')
collectors = coordinator.start_collectors(select=select,
status_lang=status_lang,
fail=test_fail,
restart=restart,
retries=4,
latest_start_time=latest_start_time,
bootstrap=bootstrap,
language_threshold=language_threshold,
keywords=keywords)
stdout.write("\nstarting {} collectors\n".format(len(collectors)))
stdout.write(f"\nKeywords: {keywords}\n")
stdout.flush()
i = 0
timeout = 7200
for instance in collectors:
instance.join(timeout=timeout)
if instance.is_alive():
raise RuntimeError(f"Thread {instance.name} took longer than {timeout} seconds \
to finish.")
if instance.err is not None:
raise instance.err
i += 1
stdout.write(f"Thread {instance.name} joined. {i} collector(s) finished\n")
stdout.flush()
if __name__ == "__main__":
# Backup latest_seeds.csv if exists
if os.path.isfile("latest_seeds.csv"):
copyfile("latest_seeds.csv",
"{}_latest_seeds.csv".format(datetime.now().isoformat().replace(":", "-")))
# Get arguments from commandline
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--seeds', type=int, help="specify number of seeds", default=10)
parser.add_argument('-l', '--language', nargs="+",
help="specify language codes of last status by users to gather")
parser.add_argument('-lt', '--lthreshold', type=float,
help="fraction threshold (0 to 1) of last 200 tweets by an account that \
must have chosen languages detected (leads to less false positives but \
also more false negatives)", default=0)
parser.add_argument('-k', '--keywords', nargs="+",
help="specify keywords contained in last 200 tweets by users to gather")
parser.add_argument('-r', '--restart',
help="restart with latest seeds in latest_seeds.csv", action="store_true")
parser.add_argument('-p', '--following_pages_limit', type=int,
help='''Define limit for maximum number of recent followings to retrieve per \
account to determine most followed friend.
1 page has a maximum of 5000 folllowings.
Lower values speed up collection. Default: 0 (unlimited)''', default=0)
parser.add_argument('-b', '--bootstrap', help="at every step, add a seed's friends and followers \
to the seed pool from which accounts are chosen randomly if walkers are at an impasse",
action="store_true")
parser.add_argument('-t', '--test', help="dev only: test for 2 loops only",
action="store_true")
parser.add_argument('-f', '--fail', help="dev only: test unexpected exception",
action="store_true")
args = parser.parse_args()
config = Config()
user_details_list = []
for detail, sqldatatype in config.config["twitter_user_details"].items():
if sqldatatype is not None:
user_details_list.append(detail)
if args.restart:
latest_seeds_df = pd.read_csv('latest_seeds.csv', header=None)[0]
latest_seeds = list(latest_seeds_df.values)
coordinator = Coordinator(seed_list=latest_seeds,
following_pages_limit=args.following_pages_limit)
print("Restarting with latest seeds:\n")
print(latest_seeds_df)
else:
coordinator = Coordinator(seeds=args.seeds,
following_pages_limit=args.following_pages_limit)
k = 0
restart_counter = 0
while True:
if args.test:
k += 1
if k == 2:
args.fail = False
if k == 3:
break
stdout.write("\nTEST RUN {}\n".format(k))
stdout.flush()
try:
if args.restart is True and restart_counter == 0:
main_loop(coordinator, select=user_details_list,
status_lang=args.language, test_fail=args.fail, restart=True,
bootstrap=args.bootstrap, language_threshold=args.lthreshold,
keywords=args.keywords)
restart_counter += 1
else:
main_loop(coordinator, select=user_details_list,
status_lang=args.language, test_fail=args.fail, bootstrap=args.bootstrap,
language_threshold=args.lthreshold, keywords=args.keywords)
except Exception:
stdout.write("Encountered unexpected exception:\n")
traceback.print_exc()
try:
if config.use_notifications is True:
response = config.send_mail({
"subject": "Unexpected Error",
"text":
f"Unexpected Error encountered.\n{traceback.format_exc()}"
}
)
assert '200' in str(response)
stdout.write(f"Sent notification to {config.notif_config['email_to_notify']}")
stdout.flush()
except Exception:
stderr.write('Could not send error-mail: \n')
traceback.print_exc(file=stderr)
stdout.write("Retrying in 5 seconds.")
stdout.flush()
latest_seeds = list(pd.read_csv('latest_seeds.csv', header=None)[0].values)
coordinator = Coordinator(seed_list=latest_seeds,
following_pages_limit=args.following_pages_limit)
args.restart = True
restart_counter = 0
time.sleep(5)