-
Notifications
You must be signed in to change notification settings - Fork 0
/
davesync.py
executable file
·301 lines (228 loc) · 10.1 KB
/
davesync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/python
import os
import sys
import argparse
import gnupg
import tempfile
import json
import colorlog
import logging
import fnmatch
import subprocess
import re
from getpass import getpass
from pathlib import Path
from webdav4.client import Client
import webdav4
# ============================ FUNCS
def assert_on_bad_file(path):
assert os.path.isfile(path) and os.access(path, os.R_OK), \
"File {} doesn't exist or isn't readable".format(path)
def assert_on_bad_dir(path):
assert os.path.isdir(path) and os.access(path, os.R_OK), \
"Directory {} doesn't exist or isn't readable".format(path)
def assert_on_bad_webdav_dir(path):
error_msg = "WebDav directory '{}' doesn't exist".format(remote_base + '/' + path)
try:
assert webdav.isdir(path), \
error_msg
except webdav4.client.ResourceNotFound:
logger.critical(error_msg)
sys.exit(1)
def run_command(command):
try:
return subprocess.run(command, shell=True, check=True, capture_output=True, text=True).stdout.strip()
except subprocess.CalledProcessError as e:
logger.critical(f"Error executing command: {e}")
sys.exit(1)
def encrypt_file(path):
f = open(path, 'rb')
fd, tempfilepath = tempfile.mkstemp()
res = gpg.encrypt_file(f, None, symmetric=args.cipher_algo, passphrase=gpg_passphrase, output=tempfilepath, armor=False,
extra_args=['--s2k-digest-algo', args.digest_algo,
'--compress-algo', args.compress_algo,
'-z', args.compress_level,
'--set-filename', os.path.basename(path)])
f.close()
if not res:
raise RuntimeError(f'GPG Error: {res.status}')
return tempfilepath
def remove_str_suffix(text, suffix):
if text is not None and suffix is not None:
return text[:-len(suffix)] if text.endswith(suffix) else text
else:
return text
def read_webdav_password():
if args.webdav_password_command:
return run_command(args.webdav_password_command)
if args.webdav_password_file:
assert_on_bad_file(args.webdav_password_file)
return Path(args.webdav_password_file).read_text().strip()
elif args.webdav_password:
return args.webdav_password
elif args.webdav_user:
return getpass('WebDav Password:')
def read_gpg_passphrase():
if args.gpg_passphrase_command:
return run_command(args.gpg_passphrase_command)
if args.gpg_passphrase_file:
assert_on_bad_file(args.gpg_passphrase_file)
return Path(args.gpg_passphrase_file).read_text().strip()
elif args.gpg_passphrase:
return args.gpg_passphrase
else:
return getpass('GPG Passphrase:')
def create_logger():
logger = colorlog.getLogger(__file__) if args.verbose < 2 else colorlog.getLogger()
logger.setLevel(logging.INFO)
handler = colorlog.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter("%(log_color)s %(message)s"))
logger.addHandler(handler)
if args.verbose >= 1:
logger.setLevel(logging.DEBUG)
return logger
def excluded_path(path):
for rgx in excluded_rgxs:
if rgx.match(path):
return True
return False
def parse_args():
parser = argparse.ArgumentParser(description='sync and encrypt your local directory to WebDav server')
parser.add_argument('local_base', type=str, help='Local directory to sync, e.g. /home/bob/myfolder')
parser.add_argument('remote_base', type=str, help='Base WebDav URL, e.g. https://example.org/dav/myfolder')
parser.add_argument('--webdav-user', '-u', metavar='USER', type=str, help='WebDav Username')
parser.add_argument('--webdav-password', '-p', metavar='PASSWORD', type=str, help='WebDav Password')
parser.add_argument('--webdav-password-file', metavar='PASSWORD_FILE', type=str, help='WebDav Password file')
parser.add_argument('--webdav-password-command', metavar='PASSWORD_COMMAND', type=str, help='WebDav Password command')
parser.add_argument('--gpg-passphrase', '-gp', metavar='PASSPHRASE', type=str, help='GPG Passphrase')
parser.add_argument('--gpg-passphrase-file', metavar='PASSPHRASE_FILE', type=str, help='GPG Passphrase file')
parser.add_argument('--gpg-passphrase-command', metavar='PASSPHRASE_COMMAND', type=str, help='GPG Passphrase command')
parser.add_argument('--delete', action='store_true', help='delete extraneous files/dirs from remote dirs.')
parser.add_argument('--delete-excluded', action='store_true', help='Delete excluded files from dest dirs')
parser.add_argument('--force', '-f', action='store_true', help='Force copying of files. Do not check files modifications')
parser.add_argument('--timeout', '-t', metavar='N', type=int, help='WebDav operation timeout N seconds. Default: %(default)s', default=10)
parser.add_argument('--exclude', metavar='PATTERN', type=str, action='append', help='exclude files matching PATTERN')
parser.add_argument('--save-metadata-step', metavar='N', type=int, help='save metadata every N uploaded files. Default: %(default)s', default=10)
parser.add_argument('--no-check-certificate', action='store_true', help='Do not verify SSL certificate')
parser.add_argument('--cipher-algo', metavar='CIPHER', type=str, default='AES256', help='Cipher algorithm. Default: %(default)s. (IDEA, 3DES, CAST5, BLOWFISH, AES, AES192, AES256, TWOFISH, CAMELLIA128, CAMELLIA192, CAMELLIA256 etc. Check your "gpg" command line help to see what symmetric cipher algorithms are supported)')
parser.add_argument('--digest-algo', metavar='DIGEST', type=str, default='SHA256', help='Digest algorithm. Default: %(default)s. (SHA1, RIPEMD160, SHA256, SHA384, SHA512, SHA224 etc. Check your "gpg" command line help to see what Hash algorithms are supported)')
parser.add_argument('--compress-algo', metavar='ALGO', type=str, default='none', help='Compression algorithm. Default: %(default)s. (zip, zlib, bzip2, none etc. Check your "gpg" command line help to see what compression algorithms are supported)')
parser.add_argument('--compress-level', '-z', metavar='N', type=str, default='0', help='Set compression level to N. Default: %(default)s')
parser.add_argument('--verbose', '-v', action='count', help='verbose (-v,-vv,-vvv)', default=0)
return parser.parse_args()
def load_metadata():
logger.info('Loading metadata from remote dir...')
if webdav.exists(METADATA_FILENAME):
tmpfile = tempfile.TemporaryFile('wb+')
webdav.download_fileobj(METADATA_FILENAME, tmpfile)
tmpfile.seek(0)
data = json.load(tmpfile)
tmpfile.close()
return data
else:
return {}
def upload_metadata(metadata):
logger.info('Uploading metadata to remote dir...')
tmpfile = tempfile.TemporaryFile('rb+')
tmpfile.write(json.dumps(metadata).encode())
tmpfile.seek(0)
webdav.upload_fileobj(tmpfile, METADATA_FILENAME, overwrite=True)
tmpfile.close()
# ============================ CODE
METADATA_FILENAME = 'davesync-metadata.json'
args = parse_args()
logger = create_logger()
local_base = args.local_base.rstrip('/')
remote_base = args.remote_base.rstrip('/')
excluded_rgxs = list(map(lambda pattern: re.compile(fnmatch.translate(pattern)), args.exclude)) if args.exclude else []
assert_on_bad_dir(local_base)
webdav_password = read_webdav_password()
gpg_passphrase = read_gpg_passphrase()
if (gpg_passphrase == None or not gpg_passphrase):
logger.critical('GPG passphrase is not set')
sys.exit(1)
gpg = gnupg.GPG()
try:
webdav = Client(remote_base, auth=(args.webdav_user, webdav_password) if args.webdav_user != None else None,
verify=not args.no_check_certificate,
timeout=args.timeout)
assert_on_bad_webdav_dir('')
except BaseException as err:
logger.critical(f'WebDav error: {err}')
sys.exit(1)
metadata = load_metadata()
new_metadata = {}
files_uploaded = 0
# traverse local base dir
logger.info('Checking files...')
for root_dir, dirs, files in os.walk(local_base):
assert_on_bad_dir(root_dir)
relpath = os.path.relpath(root_dir, local_base)
# skip excluded dir
if excluded_path(relpath):
continue
logger.debug(f"Checking Dir: '{relpath}'...")
if relpath in ['/', '.']:
relpath = ''
# create webdav dir
if relpath != '':
if webdav.exists(relpath):
if (not webdav.isdir(relpath)):
webdav.remove(relpath)
webdav.mkdir(relpath)
else:
webdav.mkdir(relpath)
dav_items = {item['name']:item for item in webdav.ls(relpath)}
dav_files = dav_items.keys()
for filename in files:
dav_filename = filename + '.gpg'
dav_filepath = (relpath + '/' + dav_filename).strip("/")
full_filepath = os.path.join(root_dir, filename)
rel_filepath = os.path.join(relpath, filename)
modified_time = os.path.getmtime(full_filepath)
filesize = os.path.getsize(full_filepath)
# skip excluded file
if excluded_path(rel_filepath):
continue
logger.debug(f"Checking file: '{rel_filepath}'...")
new_metadata[dav_filepath] = {'modified': modified_time, 'size': filesize}
# remove directory from webdav if it has same name as local file
if dav_filepath in dav_files and dav_items[dav_filepath]['type'] != 'file':
webdav.remove(dav_filepath)
# upload file if it doesn't exist or modified
if args.force \
or dav_filepath not in dav_files \
or dav_filepath not in metadata \
or (dav_filepath in metadata and (metadata[dav_filepath]['modified'] != modified_time or metadata[dav_filepath]['size'] != filesize)):
logger.info(f"Uploading file '{dav_filepath}'...")
try:
tempfilepath = encrypt_file(full_filepath)
except RuntimeError as err:
logger.critical(err)
sys.exit(1)
webdav.upload_file(tempfilepath, dav_filepath, overwrite=True)
os.unlink(tempfilepath)
files_uploaded += 1
# save metadata with modified timestamps of uploaded files
if files_uploaded % args.save_metadata_step == 0:
upload_metadata({**metadata, **new_metadata})
upload_metadata(new_metadata)
# traverse remote base dir
if args.delete:
logger.info('Checking WebDav files...')
def webdav_scan_recursively(root_dir_dir):
items = webdav.ls(root_dir_dir)
for item in items:
if item['name'] == METADATA_FILENAME:
continue
logger.debug("Checking WebDav %s '%s'", item['type'], item['name'])
local_path = os.path.join(local_base, item['name'])
if item['type'] == 'file':
local_path = remove_str_suffix(local_path, '.gpg')
if not os.path.exists(local_path) or excluded_path(item['name']):
logger.info("Remove %s '%s' from WebDav", item['type'], item['name'])
webdav.remove(item['name'])
else:
if item['type'] == 'directory':
webdav_scan_recursively(item['name'])
webdav_scan_recursively('')