forked from REDxEYE/io_mesh_SourceMDL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MDLIO_ByteIO.py
307 lines (235 loc) · 7.75 KB
/
MDLIO_ByteIO.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
import binascii
import contextlib
import io
import struct
import typing
from io import BytesIO
class OffsetOutOfBounds(Exception):
pass
def split(array, n=3):
return [array[i:i + n] for i in range(0, len(array), n)]
class ByteIO:
@contextlib.contextmanager
def save_current_pos(self):
entry = self.tell()
yield
self.seek(entry)
def __init__(self, file=None, path=None, byte_object=None, mode='r', copy_data_from_handle=True):
"""
Supported file handlers
:type byte_object: bytes
:type path: str
:type file: typing.BinaryIO
"""
if file:
if 'w' in file.mode:
self.file = file
elif 'r' in file.mode and copy_data_from_handle:
self.file = io.BytesIO(file.read())
file.close()
elif 'r' in file.mode and not copy_data_from_handle:
self.file = file
elif path:
if 'w' in mode:
self.file = open(path, mode + 'b')
elif 'r' in mode:
with open(path, mode + 'b') as f:
self.file = io.BytesIO(f.read())
elif byte_object:
self.file = io.BytesIO(byte_object)
else:
self.file = BytesIO()
def __repr__(self):
return "<ByteIO {}/{}>".format(self.tell(), self.size())
@property
def preview(self):
with self.save_current_pos():
return self.read_bytes(64)
@property
def preview_f(self):
with self.save_current_pos():
block = self.read_bytes(64)
hex_values = split(split(binascii.hexlify(block).decode().upper(), 2), 4)
return [' '.join(b) for b in hex_values]
def close(self):
if hasattr(self.file, 'mode'):
if 'w' in getattr(self.file, 'mode'):
self.file.close()
def rewind(self, amount):
self.file.seek(-amount, io.SEEK_CUR)
def skip(self, amount):
self.file.seek(amount, io.SEEK_CUR)
def seek(self, off, pos=io.SEEK_SET):
self.file.seek(off, pos)
def tell(self):
return self.file.tell()
def size(self):
curr_offset = self.tell()
self.seek(0, io.SEEK_END)
ret = self.tell()
self.seek(curr_offset, io.SEEK_SET)
return ret
def fill(self, amount):
for _ in range(amount):
self._write(b'\x00')
def insert_begin(self,to_insert):
self.seek(0)
buffer = self._read(-1)
del self.file
self.file = BytesIO()
self.file.write(to_insert)
self.file.write(buffer)
self.file.seek(0)
# ------------ PEEK SECTION ------------ #
def _peek(self, size=1):
with self.save_current_pos():
return self._read(size)
def peek(self, t):
size = struct.calcsize(t)
return struct.unpack(t, self._peek(size))[0]
def peek_fmt(self, fmt):
size = struct.calcsize(fmt)
return struct.unpack(fmt, self._peek(size))
def peek_uint64(self):
return self.peek('Q')
def peek_int64(self):
return self.peek('q')
def peek_uint32(self):
return self.peek('I')
def peek_int32(self):
return self.peek('i')
def peek_uint16(self):
return self.peek('H')
def peek_int16(self):
return self.peek('h')
def peek_uint8(self):
return self.peek('B')
def peek_int8(self):
return self.peek('b')
def peek_float(self):
return self.peek('f')
def peek_double(self):
return self.peek('d')
def peek_fourcc(self):
with self.save_current_pos():
return self.read_ascii_string(4)
# ------------ READ SECTION ------------ #
def _read(self, size=-1) -> bytes:
return self.file.read(size)
def read(self, t):
size = struct.calcsize(t)
return struct.unpack(t, self._read(size))[0]
def read_fmt(self, fmt):
size = struct.calcsize(fmt)
return struct.unpack(fmt, self._read(size))
def read_uint64(self):
return self.read('Q')
def read_int64(self):
return self.read('q')
def read_uint32(self):
return self.read('I')
def read_int32(self):
return self.read('i')
def read_uint16(self):
return self.read('H')
def read_int16(self):
return self.read('h')
def read_uint8(self):
return self.read('B')
def read_int8(self):
return self.read('b')
def read_float(self):
return self.read('f')
def read_double(self):
return self.read('d')
def read_ascii_string(self, length=None):
if length:
return bytes(''.join([chr(self.read_uint8()) for _ in range(length)]), 'utf').strip(b'\x00').decode('utf')
acc = ''
b = self.read_uint8()
while b != 0:
acc += chr(b)
b = self.read_uint8()
return acc
def read_fourcc(self):
return self.read_ascii_string(4)
def read_from_offset(self, offset, reader, **reader_args):
if offset > self.size():
raise OffsetOutOfBounds()
# curr_offset = self.tell()
with self.save_current_pos():
self.seek(offset, io.SEEK_SET)
ret = reader(**reader_args)
# self.seek(curr_offset, io.SEEK_SET)
return ret
# ------------ WRITE SECTION ------------ #
def _write(self, data):
self.file.write(data)
def write(self, t, value):
self._write(struct.pack(t, value))
def write_uint64(self, value):
self.write('Q', value)
def write_int64(self, value):
self.write('q', value)
def write_uint32(self, value):
self.write('I', value)
def write_int32(self, value):
self.write('i', value)
def write_uint16(self, value):
self.write('H', value)
def write_int16(self, value):
self.write('h', value)
def write_uint8(self, value):
self.write('B', value)
def write_int8(self, value):
self.write('b', value)
def write_float(self, value):
self.write('f', value)
def write_double(self, value):
self.write('d', value)
def write_ascii_string(self, string, zero_terminated=False, length=-1):
pos = self.tell()
for c in string:
self._write(c.encode('ascii'))
if zero_terminated:
self._write(b'\x00')
elif length != -1:
to_fill = length - (self.tell() - pos)
if to_fill > 0:
for _ in range(to_fill):
self.write_uint8(0)
def write_fourcc(self, fourcc):
self.write_ascii_string(fourcc)
def write_to_offset(self, offset, writer, value, fill_to_target=False):
if offset > self.size() and not fill_to_target:
raise OffsetOutOfBounds()
curr_offset = self.tell()
self.seek(offset, io.SEEK_SET)
ret = writer(value)
self.seek(curr_offset, io.SEEK_SET)
return ret
def read_bytes(self, size):
return self._read(size)
def read_float16(self):
return self.read('e')
def write_bytes(self, data):
self._write(data)
if __name__ == '__main__':
a = ByteIO(path=r'./test.bin', mode='w')
a.write_fourcc("IDST")
# a.write_int8(108)
# a.write_uint32(104)
# a.write_to_offset(1024,a.write_uint32,84,True)
# a.write_double(15.58)
# a.write_float(18.58)
# a.write_uint64(18564846516)
# a.write_ascii_string('Test123')
a.close()
a = ByteIO(file=open(r'./test.bin', mode='rb'))
print(a.peek_uint32())
# print(a.read_from_offset(1024,a.read_uint32))
# print(a.read_uint32())
# print(a.read_double())
# print(a.read_float())
# print(a.read_uint64())
# print(a.read_ascii_string())