

from scapy.layers.inet import *
from scapy.compat import binascii
from scapy.layers.inet import *
from scapy.layers.l2 import *
from scapy.packet import *
from scapy.sendrecv import *
# cap packets and parse protocol
# common cap sniff data
# packets check and summary
# return bool filter functions
packets = sniff(
filter="udp",
prn=lambda packets: packets.summary(),
lfilter=lambda packets: UDP in packets,
)
常使用的packets 筛选器
from ctypes import (Structure, addressof, c_char_p, c_uint8, c_uint16,
c_uint32, c_void_p, memmove, sizeof)
import crcmod
from scapy.compat import binascii
from scapy.layers.inet import *
from scapy.packet import *
from scapy.sendrecv import *
class IspCommonField(Structure):
_pack_ = 1
_fields_ = [
("start", c_uint8),
("ap_layer", c_uint8),
("vp_layer", c_uint8),
("station_numbers", c_uint32),
("device_numbers", c_uint8),
("packets_numbers", c_uint16),
("vp_layer_interact", c_uint8),
("mcp_layer", c_uint8),
("cmd", c_uint8),
("response", c_uint8),
]
class IspMCPA(Structure):
_pack_ = 1
_fields_ = [("len", c_uint8), ("tag", c_uint16), ("value", c_char_p)]
class IspMCPC(Structure):
_pack_ = 1
_fields_ = [("len", c_uint8), ("tag", c_uint32), ("value", c_char_p)]
class IspEndField(Structure):
_pack_ = 1
_fields_ = [("crc", c_uint16), ("end", c_uint8)]
# crc code calu
# isp protocol framework
class IspProtocol(Structure):
__isMCPA__ = False
__isMCPC__ = False
__IspCommonField__ = IspCommonField()
__IspCommonFieldLen__ = sizeof(__IspCommonField__)
__IspEndField__ = IspEndField()
__IspEndFieldLen__ = sizeof(__IspEndField__)
__MCPA_LIST__ = []
__MCPC_LIST__ = []
__crc16__ = 0x0
__crcFlag__ = False
def deserdes(self, msg: bytes):
esc_msg = self.__escape_recv_packets(msg)
esc_msg = "7e" + esc_msg + "7e"
esc_msg = esc_msg.encode()
memmove(
addressof(self.__IspCommonField__),
binascii.unhexlify(esc_msg),
self.__IspCommonFieldLen__,
)
# Payload pdu part
# end of payload data caluication
memmove(
addressof(self.__IspEndField__),
binascii.unhexlify(esc_msg[-6:]),
self.__IspEndFieldLen__,
)
self.__crc16__ = self.__crc16(binascii.unhexlify(esc_msg[2:-6]))
if self.__IspEndField__.crc != self.__crc16__:
print("ISP crc code parse error.drop this packets ... ...")
return False
else:
return True
def __crc16(self, code: bytes):
return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
def __escape_recv_packets(self, msg: bytes):
if msg[0] != "7e" or msg[-1] != "7e":
print("error:escap packets")
return msg[2:-2].decode().replace("5e5d", "5E").replace("5e7d", "7e")
def payload(self):
if self.__isMCPA__:
return self.__MCPA_LIST__
if self.__isMCPC__:
return self.__MCPC_LIST__
def crc16(self):
return self.__crc16__
def show(self):
pass
buf = b"7E03010100000000160000030200FF090000000503BF050000C0050000C1050000C2050000C3050000C4050000C5050000C6050000C7050000C8050000C9050000CA050000CB050000CD050000CE050000CF050000D2050000D3050000D70500000120000002200000062000000920000001210000022100000621000009210000022200000322000006220000072200000A2200000B2200000C2200000D2200000E2200000F22000001230000022300000323000011230000122300001323000021230000222300002323000031230000322300003323000051230000522300005323000061230000622300006323000002250000042500000A2500000326000004260000052600001326000092A47E"
buf = b"7e03010000000000020000010200d90900010102000300040005000600070008000a000b001000110012001300140015001600170018001900200021002200230024003000310032003300490075005c005d005e5d005f00010102011001110112011301140115012001300131013301340136013701380139014101420143014401500151015201530172010102020204020502080209020f02100211022002210222022302240225022802010302030403050308030903100311030f032003210322032303240325032803a204530454047308a005a105a205a3057c08b005b105b205b305c0056d1b7e"
hdr = IspProtocol()
hdr_len = sizeof(IspProtocol)
hdr.deserdes(buf)
debug okay
import enum
from ctypes import (Structure, addressof, c_char, c_char_p, c_uint8, c_uint16,
c_uint32, c_void_p, memmove, sizeof)
import crcmod
from scapy.compat import binascii
from scapy.layers.inet import *
from scapy.packet import *
from scapy.sendrecv import *
class pduType(enum.Enum):
MCPA = 1
MCPC = 3
class IspCommonField(Structure):
_pack_ = 1
_fields_ = [
("start", c_uint8),
("ap_layer", c_uint8),
("vp_layer", c_uint8),
("station_numbers", c_uint32),
("device_numbers", c_uint8),
("packets_numbers", c_uint16),
("vp_layer_interact", c_uint8),
("mcp_layer", c_uint8),
("cmd", c_uint8),
("response", c_uint8),
]
class mcpa_pdu(Structure):
_pack_ = 1
_fields_ = [("len", c_uint8), ("tag", c_uint16)]
value: bytes
class mcpc_pdu(Structure):
_pack_ = 1
_fields_ = [("len", c_uint8), ("tag", c_uint32)]
value: bytes
class IspEndField(Structure):
_pack_ = 1
_fields_ = [("crc", c_uint16), ("end", c_uint8)]
# crc code calu
# isp protocol framework
class IspProtocol(Structure):
__IspCommonField__ = IspCommonField()
__IspCommonFieldLen__ = sizeof(__IspCommonField__)
__IspEndField__ = IspEndField()
__IspEndFieldLen__ = sizeof(__IspEndField__)
__crc16__ = 0x0
__crcFlag__ = False
__pdu_type: pduType
__pdu__ = {}
def deserdes(self, msg: bytes):
msg = msg.lower()
esc_msg = self.__escape_recv_packets(msg)
esc_msg = "7e" + esc_msg + "7e"
esc_msg = esc_msg.encode()
memmove(
addressof(self.__IspCommonField__),
binascii.unhexlify(esc_msg),
self.__IspCommonFieldLen__,
)
ta = 8
if self.__IspCommonField__.mcp_layer == 3:
self.__pdu_type = pduType.MCPC
ta = 8
elif self.__IspCommonField__.mcp_layer == 1:
self.__pdu_type = pduType.MCPA
ta = 4
start = self.__IspCommonFieldLen__ * 2
end = -6
self.__pduList__ = []
pdu_part = esc_msg[start:end]
__len = len(pdu_part)
_start = 0
_pdu: bytes
while __len > 0:
if self.__pdu_type == pduType.MCPA:
_tmp = mcpa_pdu()
_pdu = pdu_part[_start : _start + 2]
memmove(
addressof(_tmp),
binascii.unhexlify(pdu_part[_start : _start + ta]),
3,
)
_tmp.value = pdu_part[
_start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
]
self.__pduList__.append(_tmp)
__len = __len - int(_pdu, 16) * 2
_start = _start + int(_pdu, 16) * 2
elif self.__pdu_type == pduType.MCPC:
_tmp = mcpc_pdu()
_pdu = pdu_part[_start : _start + 2]
memmove(
addressof(_tmp),
binascii.unhexlify(pdu_part[_start : _start + ta]),
5,
)
_tmp.value = pdu_part[
_start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
]
self.__pduList__.append(_tmp)
__len = __len - int(_pdu, 16) * 2
_start = _start + int(_pdu, 16) * 2
else:
break
# Payload pdu part
# end of payload data caluication
memmove(
addressof(self.__IspEndField__),
binascii.unhexlify(esc_msg[-6:]),
self.__IspEndFieldLen__,
)
self.__crc16__ = self.__crc16(binascii.unhexlify(esc_msg[2:-6]))
if self.__IspEndField__.crc != self.__crc16__:
print("ISP crc code parse error.drop this packets ... ...")
return False
else:
return True
def __crc16(self, code: bytes):
return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
def __escape_recv_packets(self, msg: bytes):
if msg[0:2] != b"7e" or msg[-2:] != b"7e":
print("escape packets error")
return msg[2:-2].decode().replace("5e5d", "5e").replace("5e7d", "7e")
def payload(self):
return self.__pduList__
def crc16(self):
return self.__crc16__
def show(self):
pass
buf = b"7E03010100000000160000030200FF090000000503BF050000C0050000C1050000C2050000C3050000C4050000C5050000C6050000C7050000C8050000C9050000CA050000CB050000CD050000CE050000CF050000D2050000D3050000D70500000120000002200000062000000920000001210000022100000621000009210000022200000322000006220000072200000A2200000B2200000C2200000D2200000E2200000F22000001230000022300000323000011230000122300001323000021230000222300002323000031230000322300003323000051230000522300005323000061230000622300006323000002250000042500000A2500000326000004260000052600001326000092A47E"
# buf = b"7e03010000000000020000010200d90900010102000300040005000600070008000a000b001000110012001300140015001600170018001900200021002200230024003000310032003300490075005c005d005e5d005f00010102011001110112011301140115012001300131013301340136013701380139014101420143014401500151015201530172010102020204020502080209020f02100211022002210222022302240225022802010302030403050308030903100311030f032003210322032303240325032803a204530454047308a005a105a205a3057c08b005b105b205b305c0056d1b7e"
hdr = IspProtocol()
hdr_len = sizeof(IspProtocol)
hdr.deserdes(buf)
print("this packets:%04x" % hdr.crc16())
for it in hdr.payload():
print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
payload parse
hdr = IspProtocol()
hdr.deserdes(buf)
print("this packets:%04x" % hdr.crc16())
tp = 4
start = 0
_tag = mcpc_tag()
for it in hdr.payload():
print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
if it.tag == 0x9:
_len = len(it.value)
if hdr.PduType() == pduType.MCPA:
_tag = mcpa_tag()
elif hdr.PduType() == pduType.MCPC:
_tag = mcpc_tag()
while _len > 0:
memmove(
addressof(_tag),
binascii.unhexlify(it.value[start : start + hdr.pduTagLen()]),
int(hdr.pduTagLen() / 2),
)
print("tag:%08x" % _tag.tag)
start = start + hdr.pduTagLen()
_len = _len - hdr.pduTagLen()
parse protocol upgrade for hex bytes
from scapy.compat import binascii
import logging
import crcmod
import enum
from ctypes import (
Structure,
addressof,
c_uint8,
c_uint16,
c_uint32,
memmove,
sizeof,
)
logger = logging.getLogger("scapy")
logger.setLevel(logging.INFO)
# cap packets and parse protocol
# common cap sniff data
# packets check and summary
# return bool filter functions
class pduType(enum.Enum):
MCPA = 1
MCPC = 3
class IspCommonField(Structure):
_pack_ = 1
_fields_ = [
("start", c_uint8),
("ap_layer", c_uint8),
("vp_layer", c_uint8),
("station_numbers", c_uint32),
("device_numbers", c_uint8),
("packets_numbers", c_uint16),
("vp_layer_interact", c_uint8),
("mcp_layer", c_uint8),
("cmd", c_uint8),
("response", c_uint8),
]
class mcpa_tag(Structure):
_pack_ = 1
_fields_ = [("len", c_uint8), ("tag", c_uint16)]
value: bytes
class mcpc_tag(Structure):
_pack_ = 1
_fields_ = [("len", c_uint8), ("tag", c_uint32)]
value: bytes
class mcpa_idx(Structure):
_pack_ = 1
_fields_ = [("tag", c_uint16)]
class mcpc_idx(Structure):
_pack_ = 1
_fields_ = [("tag", c_uint32)]
class IspEndField(Structure):
_pack_ = 1
_fields_ = [("crc", c_uint16), ("end", c_uint8)]
# crc code calu
# isp protocol framework
# only for text parse protocol
class IspProtocolForLog(Structure):
__IspCommonField__ = IspCommonField()
__IspCommonFieldLen__ = sizeof(__IspCommonField__)
__IspEndField__ = IspEndField()
__IspEndFieldLen__ = sizeof(__IspEndField__)
__crc16__ = 0x0
__crcFlag__ = False
__pdu_type: pduType
__pdu__ = {}
def deserdes(self, msg: bytes):
msg = msg.lower()
esc_msg = self.__escape_recv_packets(msg)
esc_msg = "7e" + esc_msg + "7e"
esc_msg = esc_msg.encode()
memmove(
addressof(self.__IspCommonField__),
binascii.unhexlify(esc_msg),
self.__IspCommonFieldLen__,
)
ta = 8
if self.__IspCommonField__.mcp_layer == 3:
self.__pdu_type = pduType.MCPC
ta = 8
elif self.__IspCommonField__.mcp_layer == 1:
self.__pdu_type = pduType.MCPA
ta = 4
start = self.__IspCommonFieldLen__ * 2
end = -6
self.__pduList__ = []
pdu_part = esc_msg[start:end]
__len = len(pdu_part)
_start = 0
_pdu: bytes
while __len > 0:
if self.__pdu_type == pduType.MCPA:
_tmp = mcpa_pdu()
_pdu = pdu_part[_start : _start + 2]
memmove(
addressof(_tmp),
binascii.unhexlify(pdu_part[_start : _start + ta]),
3,
)
_tmp.value = pdu_part[
_start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
]
self.__pduList__.append(_tmp)
__len = __len - int(_pdu, 16) * 2
_start = _start + int(_pdu, 16) * 2
elif self.__pdu_type == pduType.MCPC:
_tmp = mcpc_pdu()
_pdu = pdu_part[_start : _start + 2]
memmove(
addressof(_tmp),
binascii.unhexlify(pdu_part[_start : _start + ta]),
5,
)
_tmp.value = pdu_part[
_start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
]
self.__pduList__.append(_tmp)
__len = __len - int(_pdu, 16) * 2
_start = _start + int(_pdu, 16) * 2
else:
break
# Payload pdu part
# end of payload data caluication
memmove(
addressof(self.__IspEndField__),
binascii.unhexlify(esc_msg[-6:]),
self.__IspEndFieldLen__,
)
self.__crc16__ = self.__crc16(binascii.unhexlify(esc_msg[2:-6]))
if self.__IspEndField__.crc != self.__crc16__:
print("ISP crc code parse error.drop this packets ... ...")
return False
else:
return True
def __crc16(self, code: bytes):
return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
def __escape_recv_packets(self, msg: bytes):
if msg[0:2] != b"7e" or msg[-2:] != b"7e":
print("escape packets error")
return msg[2:-2].decode().replace("5e5d", "5e").replace("5e7d", "7e")
def payload(self):
return self.__pduList__
def crc16(self):
return self.__crc16__
def show(self):
pass
def example_IspLogprotolparse():
buf = b"7E03010100000000160000030200FF090000000503BF050000C0050000C1050000C2050000C3050000C4050000C5050000C6050000C7050000C8050000C9050000CA050000CB050000CD050000CE050000CF050000D2050000D3050000D70500000120000002200000062000000920000001210000022100000621000009210000022200000322000006220000072200000A2200000B2200000C2200000D2200000E2200000F22000001230000022300000323000011230000122300001323000021230000222300002323000031230000322300003323000051230000522300005323000061230000622300006323000002250000042500000A2500000326000004260000052600001326000092A47E"
# buf = b"7e03010000000000020000010200d90900010102000300040005000600070008000a000b001000110012001300140015001600170018001900200021002200230024003000310032003300490075005c005d005e5d005f00010102011001110112011301140115012001300131013301340136013701380139014101420143014401500151015201530172010102020204020502080209020f02100211022002210222022302240225022802010302030403050308030903100311030f032003210322032303240325032803a204530454047308a005a105a205a3057c08b005b105b205b305c0056d1b7e"
hdr = IspProtocolForLog()
hdr_len = sizeof(IspProtocolForLog)
hdr.deserdes(buf)
print("this packets:%04x" % hdr.crc16())
for it in hdr.payload():
print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
class IspProtocol(Structure):
__IspCommonField__ = IspCommonField()
__IspCommonFieldLen__ = sizeof(__IspCommonField__)
__IspEndField__ = IspEndField()
__IspEndFieldLen__ = sizeof(__IspEndField__)
__crc16__ = 0x0
__crcFlag__ = False
__pdu_type: pduType
__pdu__ = {}
__pduTagLen__: int
def deserdes(self, msg: bytes):
esc_msg = self.__escape_recv_packets(msg)
esc_msg = b"\x7e" + esc_msg + b"\x7e"
memmove(
addressof(self.__IspCommonField__),
esc_msg,
self.__IspCommonFieldLen__,
)
ta = 4
if self.__IspCommonField__.mcp_layer == 3:
self.__pdu_type = pduType.MCPC
ta = 4
elif self.__IspCommonField__.mcp_layer == 1:
self.__pdu_type = pduType.MCPA
ta = 2
self.__pduTagLen__ = ta
start = self.__IspCommonFieldLen__
end = -3
self.__pduList__ = []
pdu_part = esc_msg[start:end]
__len = len(pdu_part)
_start = 0
_pdu = 0
while __len > 0:
if self.__pdu_type == pduType.MCPA:
_tmp = mcpa_tag()
_pdu = pdu_part[_start]
memmove(
addressof(_tmp),
pdu_part[_start : _start + ta],
sizeof(mcpa_tag),
)
_tmp.value = pdu_part[_start + ta + 1 : _start + ta + 1 + _pdu - ta - 1]
self.__pduList__.append(_tmp)
__len = __len - _pdu
_start = _start + _pdu
elif self.__pdu_type == pduType.MCPC:
_tmp = mcpc_tag()
_pdu = pdu_part[_start]
memmove(
addressof(_tmp),
pdu_part[_start : _start + ta],
sizeof(mcpc_tag),
)
_tmp.value = pdu_part[_start + ta + 1 : _start + ta + 1 + _pdu - ta - 1]
self.__pduList__.append(_tmp)
__len = __len - _pdu
_start = _start + _pdu
else:
break
# Payload pdu part
# end of payload data caluication
memmove(
addressof(self.__IspEndField__),
esc_msg[-3:],
self.__IspEndFieldLen__,
)
self.__crc16__ = self.__crc16(esc_msg[1:-3])
if self.__IspEndField__.crc != self.__crc16__:
print("ISP crc code parse error.drop this packets ... ...")
return False
else:
return True
def __crc16(self, code: bytes):
return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
def __escape_recv_packets(self, msg: bytes):
if msg[0] != b"\x7e" or msg[-1] != b"\x7e":
print("escape packets error")
return msg[1:-1].replace(b"\x5e\x5d", b"\x5e").replace(b"\x5e\x7d", b"\x7e")
def PduType(self):
return self.__pdu_type
def pduTagLen(self):
return self.__pduTagLen__
def payload(self):
return self.__pduList__
def crc16(self):
return self.__crc16__
def example_parse_Isp_protocol():
buf = b"\x7E\x03\x01\x01\x00\x00\x00\x00\x16\x00\x00\x03\x02\x00\xFF\x09\x00\x00\x00\x05\x03\xBF\x05\x00\x00\xC0\x05\x00\x00\xC1\x05\x00\x00\xC2\x05\x00\x00\xC3\x05\x00\x00\xC4\x05\x00\x00\xC5\x05\x00\x00\xC6\x05\x00\x00\xC7\x05\x00\x00\xC8\x05\x00\x00\xC9\x05\x00\x00\xCA\x05\x00\x00\xCB\x05\x00\x00\xCD\x05\x00\x00\xCE\x05\x00\x00\xCF\x05\x00\x00\xD2\x05\x00\x00\xD3\x05\x00\x00\xD7\x05\x00\x00\x01\x20\x00\x00\x02\x20\x00\x00\x06\x20\x00\x00\x09\x20\x00\x00\x01\x21\x00\x00\x02\x21\x00\x00\x06\x21\x00\x00\x09\x21\x00\x00\x02\x22\x00\x00\x03\x22\x00\x00\x06\x22\x00\x00\x07\x22\x00\x00\x0A\x22\x00\x00\x0B\x22\x00\x00\x0C\x22\x00\x00\x0D\x22\x00\x00\x0E\x22\x00\x00\x0F\x22\x00\x00\x01\x23\x00\x00\x02\x23\x00\x00\x03\x23\x00\x00\x11\x23\x00\x00\x12\x23\x00\x00\x13\x23\x00\x00\x21\x23\x00\x00\x22\x23\x00\x00\x23\x23\x00\x00\x31\x23\x00\x00\x32\x23\x00\x00\x33\x23\x00\x00\x51\x23\x00\x00\x52\x23\x00\x00\x53\x23\x00\x00\x61\x23\x00\x00\x62\x23\x00\x00\x63\x23\x00\x00\x02\x25\x00\x00\x04\x25\x00\x00\x0A\x25\x00\x00\x03\x26\x00\x00\x04\x26\x00\x00\x05\x26\x00\x00\x13\x26\x00\x00\x92\xA4\x7E"
buf = b"\x7e\x03\x01\x00\x00\x00\x00\x00\x02\x00\x00\x01\x02\x00\xd9\x09\x00\x01\x01\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\x0a\x00\x0b\x00\x10\x00\x11\x00\x12\x00\x13\x00\x14\x00\x15\x00\x16\x00\x17\x00\x18\x00\x19\x00\x20\x00\x21\x00\x22\x00\x23\x00\x24\x00\x30\x00\x31\x00\x32\x00\x33\x00\x49\x00\x75\x00\x5c\x00\x5d\x00\x5e\x5d\x00\x5f\x00\x01\x01\x02\x01\x10\x01\x11\x01\x12\x01\x13\x01\x14\x01\x15\x01\x20\x01\x30\x01\x31\x01\x33\x01\x34\x01\x36\x01\x37\x01\x38\x01\x39\x01\x41\x01\x42\x01\x43\x01\x44\x01\x50\x01\x51\x01\x52\x01\x53\x01\x72\x01\x01\x02\x02\x02\x04\x02\x05\x02\x08\x02\x09\x02\x0f\x02\x10\x02\x11\x02\x20\x02\x21\x02\x22\x02\x23\x02\x24\x02\x25\x02\x28\x02\x01\x03\x02\x03\x04\x03\x05\x03\x08\x03\x09\x03\x10\x03\x11\x03\x0f\x03\x20\x03\x21\x03\x22\x03\x23\x03\x24\x03\x25\x03\x28\x03\xa2\x04\x53\x04\x54\x04\x73\x08\xa0\x05\xa1\x05\xa2\x05\xa3\x05\x7c\x08\xb0\x05\xb1\x05\xb2\x05\xb3\x05\xc0\x05\x6d\x1b\x7e"
hdr = IspProtocol()
hdr.deserdes(buf)
print("this packets:%04x" % hdr.crc16())
print(hdr.PduType())
start = 0
_tag = mcpc_tag()
print("protocol layer:%x" % hdr.pduTagLen())
for it in hdr.payload():
print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
if it.tag == 0x9:
_len = len(it.value)
if hdr.PduType() == pduType.MCPA:
_tag = mcpa_idx()
elif hdr.PduType() == pduType.MCPC:
_tag = mcpc_idx()
while _len > 0:
memmove(
addressof(_tag),
it.value[start : start + hdr.pduTagLen()],
hdr.pduTagLen(),
)
print("tag:%08x" % _tag.tag)
start = start + hdr.pduTagLen()
_len = _len - hdr.pduTagLen()
example_parse_Isp_protocol()