mitmproxy + python 实现游戏协议测试

random 测试交流51,382字数 11846阅读模式

mitmproxy + python 实现游戏协议测试

本文侧重介绍如何使用 python 和 mitmproxy 实现拦截数据包、重发数据包,以及解析 protobuf 数据内容,对于相关依赖的安装不做介绍。

一、游戏协议安全测试内容

参考https://testerhome.com/topics/29053,这篇文章讲的很清楚。文章源自玩技e族-https://www.playezu.com/246905.html


二、实现原理

想直接使用的同学可以跳到第三部分。文章源自玩技e族-https://www.playezu.com/246905.html

mitmproxy 作为代理,可以获取客户端与服务端通信的数据,并且可以拦截、修改和自主发送数据。当配合其证书使用时,还可以解密 wss 连接中的 websocket 数据。文章源自玩技e族-https://www.playezu.com/246905.html

  • Websotcket 数据处理源码分析

在 http 代理的过程中若发现 upgrade websocket 请求,则创建 WebSocketLayer 实例,并调用其_call_方法。文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/http.py
"""以下为Httplayer的_process_flow方法的部分代码"""
if f.response.status_code == 101:
# Handle a successful HTTP 101 Switching Protocols Response,
    # received after e.g. a WebSocket upgrade request.
    # Check for WebSocket handshake
    is_websocket = (
websockets.check_handshake(f.request.headers) and
websockets.check_handshake(f.response.headers)
)
if is_websocket and not self.config.options.websocket:
self.log(
"Client requested WebSocket connection, but the protocol is disabled.",
"info"
)
if is_websocket and self.config.options.websocket:
layer = WebSocketLayer(self, f)
else:
layer = self.ctx.next_layer(self)
layer()

WebSocketLayer 初始化时会创建用于此次 websocket 通信的编解码器。文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的init方法,省略部分代码"""
def __init__(self, ctx, handshake_flow):
super().__init__(ctx)
self.handshake_flow = handshake_flow
self.connections: dict[object, WSConnection] = {}
client_extensions = []
server_extensions = []
# 判断交互数据是否使用deflate压缩
    if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
client_extensions = [PerMessageDeflate()]
server_extensions = [PerMessageDeflate()]
# self.client_conn和self.server_conn继承自ctx,即原http的client和server,原理为父类的__getattr__(self, name)方法返回的是getattr(self.ctx, name)。WSConnection是一个websocket协议编解码器,实际不会发送任何网络IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html
    # 负责和解码server收到信息和编码server发送的信息
    self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER)
# 负责和解码client收到信息和编码client发送的信息
    self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT)
# 构造发送给Server的websocket的握手请求
    request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path)
# send()方法只会构造一个适用于对应conn的数据,并不会真正发送数据,recv_data()会将信息解码,需要通过next(conn.events())获取解码后数据
    # 按上所说,下面两行代码的操作是将握手请求按client编码后发送给server编码器,然后让server编码器解码
    data = self.connections[self.server_conn].send(request)
self.connections[self.client_conn].receive_data(data)
event = next(self.connections[self.client_conn].events())
assert isinstance(event, events.Request)
# 返回给客户端接受连接响应
    data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions))
self.connections[self.server_conn].receive_data(data)
assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)

WebSocketLayer 实例的_call_方法负责处理后续 websocket 通信文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的call方法,省略部分代码"""
def __call__(self):
self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow)
self.flow.metadata['websocket_handshake'] = self.handshake_flow.id
self.handshake_flow.metadata['websocket_flow'] = self.flow.id
# 调用addons中的websocket_start(self, flow)对flow进行处理
    self.channel.ask("websocket_start", self.flow)
conns = [c.connection for c in self.connections.keys()]
close_received = False
try:
while not self.channel.should_exit.is_set():
# 往client或server插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server是队列,后续实现在连接中主动发消息就是通过往队列中插入数据实现
            self._inject_messages(self.client_conn, self.flow._inject_messages_client)
self._inject_messages(self.server_conn, self.flow._inject_messages_server)
# select监视原http的client和server连接的可读事件
            r = tcp.ssl_read_select(conns, 0.1)
for conn in r:
source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
is_server = (source_conn == self.server_conn)
frame = websockets.Frame.from_file(source_conn.rfile)
# 将从conn中获取的数据放入编解码器,此方法并没有返回值,所以data是None
                data = self.connections[source_conn].receive_data(bytes(frame))
# data是None,不解此举有何意义
                source_conn.send(data)
if close_received:
return
# 处理编解码器中解码后的数据,event由pop取出,后续不会再用到。
                for event in self.connections[source_conn].events():
if not self._handle_event(event, source_conn, other_conn, is_server):
if not close_received:
close_received = True
except (socket.error, exceptions.TcpException, SSL.Error) as e:
s = 'server' if is_server else 'client'
self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e)))
# 调用addons中的websocket_start(self, flow)对flow进行处理
        self.channel.tell("websocket_start", self.flow)
finally:
self.flow.ended = True
# 调用addons中的websocket_end(self, flow)对flow进行处理
        self.channel.tell("websocket_end", self.flow)

WebSocketLayer 实例中处理 Message Event 的方法文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的_handle_message方法,_handle_event中,若isinstance(event, events.Message),则会调用此函数"""
def _handle_message(self, event, source_conn, other_conn, is_server):
fb = self.server_frame_buffer if is_server else self.client_frame_buffer
fb.append(event.data)
if event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
if isinstance(event, events.TextMessage):
message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
fb.clear()
websocket_message = WebSocketMessage(message_type, not is_server, payload)
length = len(websocket_message.content)
self.flow.messages.append(websocket_message)
# 调用addons中的websocket_message(self, flow)对flow进行处理
        self.channel.ask("websocket_message", self.flow)
# WebsocketMessage的属性killed用于判断该信息是否需要被转发,可在websocket_message函数中调用message的kill()方法置为True
        if not self.flow.stream and not websocket_message.killed:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
                    pos = 0
for s in original_chunk_sizes:
yield (payload[pos:pos + s], True if pos + s == length else False)
pos += s
else:
# just re-chunk everything into 4kB frames
                    # header len = 4 bytes without masking key and 8 bytes with masking key
                    chunk_size = 4092 if is_server else 4088
chunks = range(0, len(payload), chunk_size)
for i in chunks:
yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)
# 将收到的信息重新编码后向对端发送
            for chunk, final in get_chunk(websocket_message.content):
data = self.connections[other_conn].send(Message(data=chunk, message_finished=final))
other_conn.send(data)
if self.flow.stream:
data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished))
other_conn.send(data)
return True
  • Tcp 数据处理源码分析

TCP 数据处理触发条件文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/root_context.py
"""RootContext类_next_layer方法,省略部分代码"""
"""
4. Check for --tcp
判断Option中tcp_hosts, 类型是一个列表,包含需要转换成tcp流信息的server address正则表达式,例如['192.168.d+.d+']
"""
if self.config.check_tcp(top_layer.server_conn.address):
return protocol.RawTCPLayer(top_layer)
"""
6. Check for raw tcp mode
判断Option中rawtcp,类型是bool,若为true,则将不能处理的流转换成tcp流处理,建议开启,默认是false
"""
is_ascii = (
len(d) == 3 and
# expect A-Za-z
    all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
)
if self.config.options.rawtcp and not is_ascii:
return protocol.RawTCPLayer(top_layer)

TCP 信息处理 RawTCPLayer 类源码文章源自玩技e族-https://www.playezu.com/246905.html

class RawTCPLayer(base.Layer):
chunk_size = 4096
def __init__(self, ctx, ignore=False):
self.ignore = ignore
super().__init__(ctx)
def __call__(self):
self.connect()
if not self.ignore:
f = tcp.TCPFlow(self.client_conn, self.server_conn, self)
# 调用addons中的tcp_start(self, flow)对flow进行处理
            self.channel.ask("tcp_start", f)
# 创建一个长度为4096的空bytearray
        buf = memoryview(bytearray(self.chunk_size))
client = self.client_conn.connection
server = self.server_conn.connection
conns = [client, server]
# https://github.com/openssl/openssl/issues/6234
        for conn in conns:
if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"):
SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY)
try:
while not self.channel.should_exit.is_set():
r = mitmproxy.net.tcp.ssl_read_select(conns, 10)
for conn in r:
dst = server if conn == client else client
try:
# 将从conn中recv的数据存入buf,返回size
                        size = conn.recv_into(buf, self.chunk_size)
except (SSL.WantReadError, SSL.WantWriteError):
continue
if not size:
conns.remove(conn)
# Shutdown connection to the other peer
                        if isinstance(conn, SSL.Connection):
# We can't half-close a connection, so we just close everything here.
                            # Sockets will be cleaned up on a higher level.
                            return
else:
dst.shutdown(socket.SHUT_WR)
if len(conns) == 0:
return
continue
# 将recv的数据转成TCPMessage
                    tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes())
if not self.ignore:
f.messages.append(tcp_message)
# 调用addons中的tcp_message(self, flow)对flow进行处理
                        self.channel.ask("tcp_message", f)
# 发送tcp_message中的content
                    dst.sendall(tcp_message.content)
except (socket.error, exceptions.TcpException, SSL.Error) as e:
if not self.ignore:
f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e)))
# 调用addons中的tcp_error(self, flow)对flow进行处理
                self.channel.tell("tcp_error", f)
finally:
if not self.ignore:
# 调用addons中的tcp_end(self, flow)对flow进行处理
                self.channel.tell("tcp_end", f)

三、开启 mitmproxy 并加载 addon

首先需要安装两个库:mitmproxy 和 mitmdump文章源自玩技e族-https://www.playezu.com/246905.html

1、编写 websocket 的 addon
"""
简略版用于websocket的Addon
后续改进可以增加判断host,避免拦截到不需要处理的连接,或者将Queue改成redis
"""
import asyncio
from multiprocessing import Queue
import mitmproxy.websocket
class WebsocketAddon:
def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
self._input_q = input_q
self._output_q = output_q
async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
while not flow.ended and not flow.error:
# 增加间隔,否则会阻塞event
            await asyncio.sleep(0.5)
while not self._input_q.empty():
# WebSocketFlow的内置方法,用于主动插入信息,这里我只主动插入client->server的信息
                flow.inject_message(flow.server_conn, self._input_q.get())
def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
# 加入发送websocket消息的task,参考了官方的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message
        asyncio.get_event_loop().create_task(self.inject(flow))
def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
message = flow.messages[-1]
self._output_q.put({
'from_client': message.from_client,
'data': message.content
})
# message.kill()可以让Layer不转发该条信息,我这里的目的是拦截掉所有客户端发送的数据,由自己编辑后再发送
        if message.from_client:
message.kill()
2、编写 socket 的 addon
"""
简略版用于socket的Addon
和websocket版差别不大,插入数据和拦截数据有区别
"""
import asyncio
from multiprocessing import Queue
import mitmproxy.tcp
class SocketAddon:
def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
self._input_q = input_q
self._output_q = output_q
async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
while flow.live and not flow.error:
await asyncio.sleep(0.5)
while not self._input_q.empty():
# 直接向对端发送socket信息完成插入
                    flow.server_conn.connection.sendall(payload)
def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
asyncio.get_event_loop().create_task(self.inject(flow))
def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
message = flow.messages[-1]
self._output_q.put({
'from_client': message.from_client,
'data': message.content
})
if message.from_client:
# socket发送0字节,conn.sendall(b'')将不会发送任何数据
           message.content = b''
3、开启 mitmproxy 并完成处理函数
import multiprocessing
from mitmdump import Options, DumpMaster
def start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
addons = [
# 自主选择是使用Websocket还是Socket
            WebsocketAddon(input_q, output_q)
# SocketAddon(input_q, output_q)
        ]
opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5',
rawtcp=True,
# 需要转换tcp数据成的ip正则
                   tcp_hosts=['.*'],
flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, )
m = DumpMaster(opts)
m.addons.add(*addons)
m.run()
def deal_client_message_func(client_message: [bytes, str]):
if type(client_message) is bytes:
return client_message.decode('utf-8').encode('gbk')
elif type(client_message) is str:
return f"test {client_message}"
def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
while True:
if not output_q.empty()
message = output_q.get()
print(f"{'客户端' if message['from_client'] else '服务端'} 包内容:{message['data']}")
if message['from_client']:
input_q.push(deal_client_message_func(message['data']))
def main():
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
# 使用子进程启动proxy
    multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start()
simple_handel_message_func(input_queue, output_queue)

四、总结

​ 对于想实现开头文中所提到的功能还需要实现客户端,以及对于 protobuf 协议的编解码,这里限于篇幅不再讨论,后续有机会再更新。

​ 另外,之所以 mitmproxy 选择 socks5 模式,是因为 socks 协议支持代理除了 http、https 以外更多种类的协议,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。

玩技站长微信
添加好友自动发送入群邀请
weinxin
rainbow-shownow
玩技官方公众号
官方微信公众号
weinxin
PLAYEZU
 
  • 版权提示:本站仅供存储任何法律责任由作者承担▷诈骗举报◁▷新闻不符◁▷我要投稿◁
    风险通知:非原创文章均为网络投稿真实性无法判断,侵权联系2523030730
    免责声明:内容来自用户上传发布或新闻客户端自媒体,切勿!切勿!切勿!添加联系方式以免受骗。
  • 原创转载:https://www.playezu.com/246905.html
    转载说明: 点我前往阅读>>>
评论  5  访客  5
    • Raymondcex
      Raymondcex 0

      Get millions upon millions of instant leads for your organization to jumpstart your advertising campaign. Make use of the lists an infinite number of times. We have been delivering companies and market research firms with information since 2012. Direct Marketing

      • Cuinn
        Cuinn 9

        想了解一下是 websocket+ protobuf 这种私有协议怎么去进行抓包解析,望楼主回复或出帖子分享,万分感谢!

        • 小趴菜
          小趴菜 9

          仅楼主可见

          • 自然生长
            自然生长 9

            应该是命令行工具没法修改重播吧,官方 addon example 里还有修改 websocket 的例子呢,但原始 tcp 官方没有例子,但通过看源代码也能实现。

            • 陈随想
              陈随想 9

              https://docs.mitmproxy.org/stable/concepts-protocols/
              看了下文档,发现 V7 文档写着还不能修改重播?有点奇怪。另外看 issue 问题似乎也挺多的。

            匿名

            发表评论

            匿名网友
            确定