This repository has been archived by the owner on Oct 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ypacket.py
120 lines (90 loc) · 2.96 KB
/
ypacket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import struct
from .utils import YDict
YMSG_VER = b'\x00\x13\x00\x00'
YMSG_HEADER = b'YMSG'
YMSG_SEP = b'\xC0\x80'
class InvalidPacket(Exception):
pass
class YPacketError(Exception):
pass
class YPacket(object):
def __init__(self, data = None):
self._service = b'\x00\x00'
self._length = b'\x00\x00'
self._status = b'\x00\x00\x00\x00'
self._sid = b'\x00\x00\x00\x00'
self._keyvals = YDict()
self._data = b''
if data:
self.populateFromData(data)
# setter and getter for sid
@property
def sid(self):
return self._sid
@sid.setter
def sid(self, value):
self._sid = value
# setter and getter for service
@property
def service(self):
return struct.unpack('!H', self._service)[0]
@service.setter
def service(self, value):
self._service = struct.pack('!H', value)
# setter and getter for status
@property
def status(self):
return struct.unpack('!I', self._status)[0]
@status.setter
def status(self, value):
self._status = struct.pack('!I', value)
# setter and getter for length
@property
def length(self):
return struct.unpack('!H', self._length)[0]
@length.setter
def length(self, value):
raise YPacketError("Packet length cannot be modified from outside")
# setter and getter for data
@property
def data(self):
return self._keyvals
@data.setter
def data(self, ydict):
self._keyvals = ydict
# misc functions
def _packData(self):
rawData = b''
for key in self._keyvals:
rawData += key.encode() + YMSG_SEP + self._keyvals[key].encode() + YMSG_SEP
return rawData
def _unpackData(self, raw_data = None):
ydict = YDict()
if not raw_data:
raw_data = self._data
kv = raw_data.split(YMSG_SEP)
i = iter(kv)
l = list(zip(i, i))
for key, val in l:
ydict[key.decode()] = val.decode()
return ydict
def setHeader(self, headerdata):
if not headerdata.startswith(YMSG_HEADER) or len(headerdata) < 20:
raise InvalidPacket("Invalid header")
self._service = headerdata[10:12]
self._length = headerdata[8:10]
self._status = headerdata[12:16]
self._sid = headerdata[16:20]
def setData(self, rawdata):
self._data = rawdata
self._keyvals = self._unpackData()
def populateFromData(self, data):
self.setHeader(header)
self.setData(data[20:])
def encode(self):
self._data = self._packData()
self._length = struct.pack('!H', len(self._data))
return YMSG_HEADER + YMSG_VER + self._length + self._service \
+ self._status + self._sid + self._data
def __repr__(self):
return "<Packet Type={0}, Length={1}b, Status={2}, Data={3}>".format(hex(self.service), self.length, self._status, self._keyvals)