1. 概述
在对某些安全产品的能力测试过程中,本人有接触到少部分的测试场景需要,需要人工针对性的修改或制造通信流量信息,故在此做简单记录。
主要涉及两种方式:
Tcprewrite:基于现有 pcap 直接修改,但修改字段和场景受限。
Scapy:完全编码自定义,灵活。
2. 基于现有流量修改
2.1 Tcpreplay工具链
Tcpreplay 是一个免费的开源,可以编辑和重放已捕获的网络流量,最初设计的目的是可以将恶意流量模式重放到入侵检测/防御系统。
该项目包含以下程序:
tcpreplay: replays pcap files at arbitrary speeds onto a network
tcprewrite: edits pcap files and creates a new pcap file
tcpreplay-edit: edits pcap files and replays onto a network
tcpprep: creates client/server definition cache files used by tcpreplay/tcprewrite
tcpbridge: bridge two network segments with the power of tcprewrite
tcpliveplay: replay a TCP pcap file in a manner that a server will recognize
tcpcapinfo: raw pcap file decoder and debugger
tcprewrite 就是工具链中用来编辑流量的工具:https://tcpreplay.appneta.com/wiki/tcprewrite-man.html
2.2 tcprewrite快速上手
这里简单列出几个可能的常见参数:
-s, --seed=<number>:通过伪随机种子修改源/目的IPv4/v6地址,禁止与--fuzz-seed同时使用,随机化的地址仍然保持客户端/服务器之间的关系-r, --portmap=<port pairs>:批量重写TCP/UDP端口(如80:8000,8080:80),支持多组映射-S string, --srcipmap=string/-D string, --dstipmap=string:修改源/目的地址,但只影响一个方向。-N, --pnat=<CIDR pairs>:全局伪NAT重写IP(192.168.0.0/16:10.77.0.0/16),同时修改源和目的地址-i, --infile=<file>:指定输入PCAP文件路径(必需参数)-o, --outfile=<file>:指定输出PCAP文件路径(必需参数)-e, --endpoints=<IP pairs>:将流量重写为指定两端点IP间通信(需配合-c缓存文件)
## 修改报文IP到同一指定值
tcpprep -p --pcap=input.pcap --cachefile=output.cache tcprewrite -i input.pcap -o output.pcap --cachefile=ouput.cache -e 1.1.1.1:2.2.2.2
## 修改端口到指定端口,不区分源、目的
tcprewrite --portmap=53:54 --infile=input.pcap --outfile=output.pcap
## 修改源IP到指定段
tcprewrite --srcipmap=172.16.0.0/12:10.1.0.0/24 --infile=input.pcap --outfile=output.pcap2.3 特点总结
修改字段有限,只能处理简单的流量,遇到批量复杂场景不适用。
使用限制,无 windows 版本(本人未找到)。
3. 基于Scapy从零制造
3.1 Scapy 库
一个可以让用户发送、侦听和解析并伪装网络报文的Python 网络数据处理库,可用于编写侦测、扫描和攻击网络的工具程序。
3.2 快速上手
如下,使用 / 分割各层协议。
from scapy.all import *
# 以太网层
eth = Ether(src="00:11:22:33:44:55", dst="00:aa:bb:cc:dd:ee")
# IP 层
ip = IP(src="192.168.1.100", dst="192.168.1.1")
# TCP 层
tcp = TCP(sport=12345, dport=80, seq=1000, ack=2000, flags="PA")
# HTTP 数据负载
http_get = "GET / HTTP/1.1\r\nHost: 192.168.1.1\r\nUser-Agent: Scapy-HTTP-Example\r\n\r\n"
raw = Raw(load=http_get)
# 组合数据包
packet = eth / ip / tcp / raw
# 显示数据包结构(不发送)
packet.show()
#发送
send(packet) 3.3 编程示例
概述:模拟构造 HTTP 请求,包含三次握手过程
from scapy.all import *
import random
def build_http_traffic(
src_ip,
dst_ip,
src_port=0,
dst_port=80,
include_handshake=False
):
"""
构造 HTTP 请求和响应流量包(可选 TCP 握手)。
参数:
- src_ip (str): 源 IP 地址(客户端)
- dst_ip (str): 目标 IP 地址(服务器)
- src_port (int): 源端口,如果为 0 则随机生成
- dst_port (int): 目标端口,如果为 0 则随机生成
- include_handshake (bool): 是否添加 TCP 握手包(SYN, SYN-ACK, ACK)
返回:
- list[scapy.Packet]: 构造好的数据包列表
"""
packets = []
# 1. 源端口和目标端口(随机生成)
if src_port == 0:
src_port = random.randint(1024, 65535)
if dst_port == 0:
dst_port = random.randint(1024, 65535)
# 随机生成初始 seq 和 ack
initial_seq = random.randint(0, 10000)
initial_ack = random.randint(0, 10000)
src_mac = str(RandMAC())
dst_mac = str(RandMAC())
# 2. 如要握手包
if include_handshake:
# --- SYN (客户端 -> 服务器)
syn_packet = Ether(dst=dst_mac, src=src_mac)/IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags="S", seq=initial_seq)
packets.append(syn_packet)
# --- SYN-ACK (服务器 -> 客户端)
syn_ack_packet = Ether(dst=src_mac, src=dst_mac)/IP(src=dst_ip, dst=src_ip) / TCP(sport=dst_port, dport=src_port, flags="SA", seq=initial_ack, ack=initial_seq+1)
packets.append(syn_ack_packet)
# --- ACK (客户端 -> 服务器)
ack_packet = Ether(dst=dst_mac, src=src_mac)/IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags="A", seq=initial_seq+1, ack=initial_ack+1)
packets.append(ack_packet)
# 3. 构造 HTTP 请求包(客户端 -> 服务器)
http_payload = "POST /api/sql?a='and(select 1,2 union(select count(*),concat(floor(rand(0)*2),0x3a)) HTTP/1.1\r\nHost: example.com\r\nUser-Agent: Scapy-HTTP-Client\r\n\r\n"
http_request_packet = (
Ether(dst=dst_mac, src=src_mac)/
IP(src=src_ip, dst=dst_ip) /
TCP(sport=src_port, dport=dst_port, flags="PA", seq=initial_seq+1, ack=initial_ack+1) /
Raw(load=http_payload)
)
packets.append(http_request_packet)
# 更新 seq
initial_seq += len("GET /index.html HTTP/1.1\r\nHost: example.com\r\nUser-Agent: Scapy-HTTP-Client\r\n\r\n")
# 4. 构造 HTTP 响应包(服务器 -> 客户端)
http_response_data = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: 12\r\n\r\n{\"status\": 1,\"msg\": \"OK\"}"
http_response_packet = (
Ether(dst=src_mac, src=dst_mac)/
IP(src=dst_ip, dst=src_ip) /
TCP(sport=dst_port, dport=src_port, flags="PA", seq=initial_ack+1, ack=initial_seq+1) /
Raw(load=http_response_data)
)
packets.append(http_response_packet)
return packets
target_ips = ["104.164.110.12", "104.167.221.114"]
all_packets = []
for ip in target_ips:
all_packets.extend(build_http_traffic(
src_ip=ip,
dst_ip="172.16.20.75",
src_port=0, # 随机源端口
dst_port=0, # 随机目标端口
include_handshake=True
))
wrpcap("http.pcap", all_packets)
print(f"已成功构造{len(target_ips)}个IP的HTTP握手请求包,共{len(all_packets)}个数据包,保存到http.pcap")