1. 概述

在对某些安全产品的能力测试过程中,本人有接触到少部分的测试场景需要,需要人工针对性的修改或制造通信流量信息,故在此做简单记录。

主要涉及两种方式:

  • Tcprewrite:基于现有 pcap 直接修改,但修改字段和场景受限。

  • Scapy:完全编码自定义,灵活。

2. 基于现有流量修改

2.1 Tcpreplay工具链

Tcpreplay 是一个免费的开源,可以编辑和重放已捕获的网络流量,最初设计的目的是可以将恶意流量模式重放到入侵检测/防御系统。

该项目包含以下程序:

  • tcpreplay: replays pcap files at arbitrary speeds onto a network

  • tcprewrite: edits pcap files and creates a new pcap file

  • tcpreplay-edit: edits pcap files and replays onto a network

  • tcpprep: creates client/server definition cache files used by tcpreplay/tcprewrite

  • tcpbridge: bridge two network segments with the power of tcprewrite

  • tcpliveplay: replay a TCP pcap file in a manner that a server will recognize

  • tcpcapinfo: raw pcap file decoder and debugger

tcprewrite 就是工具链中用来编辑流量的工具:https://tcpreplay.appneta.com/wiki/tcprewrite-man.html

2.2 tcprewrite快速上手

这里简单列出几个可能的常见参数:

  • -s, --seed=<number>:通过伪随机种子修改源/目的IPv4/v6地址,禁止与--fuzz-seed同时使用,随机化的地址仍然保持客户端/服务器之间的关系

  • -r, --portmap=<port pairs>:批量重写TCP/UDP端口(如80:8000,8080:80),支持多组映射

  • -S string, --srcipmap=string / -D string, --dstipmap=string:修改源/目的地址,但只影响一个方向。

  • -N, --pnat=<CIDR pairs>:全局伪NAT重写IP(192.168.0.0/16:10.77.0.0/16),同时修改源和目的地址

  • -i, --infile=<file>:指定输入PCAP文件路径(必需参数)

  • -o, --outfile=<file>:指定输出PCAP文件路径(必需参数)

  • -e, --endpoints=<IP pairs>:将流量重写为指定两端点IP间通信(需配合-c缓存文件)

## 修改报文IP到同一指定值
tcpprep -p --pcap=input.pcap --cachefile=output.cache tcprewrite -i input.pcap -o output.pcap --cachefile=ouput.cache -e 1.1.1.1:2.2.2.2

## 修改端口到指定端口,不区分源、目的
tcprewrite --portmap=53:54 --infile=input.pcap --outfile=output.pcap

## 修改源IP到指定段
tcprewrite --srcipmap=172.16.0.0/12:10.1.0.0/24 --infile=input.pcap --outfile=output.pcap

2.3 特点总结

  • 修改字段有限,只能处理简单的流量,遇到批量复杂场景不适用。

  • 使用限制,无 windows 版本(本人未找到)。

3. 基于Scapy从零制造

3.1 Scapy 库

一个可以让用户发送、侦听和解析并伪装网络报文的Python 网络数据处理库,可用于编写侦测、扫描和攻击网络的工具程序。

3.2 快速上手

如下,使用 / 分割各层协议。

from scapy.all import *

# 以太网层
eth = Ether(src="00:11:22:33:44:55", dst="00:aa:bb:cc:dd:ee")

# IP 层
ip = IP(src="192.168.1.100", dst="192.168.1.1")

# TCP 层
tcp = TCP(sport=12345, dport=80, seq=1000, ack=2000, flags="PA")

# HTTP 数据负载
http_get = "GET / HTTP/1.1\r\nHost: 192.168.1.1\r\nUser-Agent: Scapy-HTTP-Example\r\n\r\n"
raw = Raw(load=http_get)

# 组合数据包
packet = eth / ip / tcp / raw

# 显示数据包结构(不发送)
packet.show()

#发送
send(packet) 

3.3 编程示例

概述:模拟构造 HTTP 请求,包含三次握手过程

from scapy.all import *
import random

def build_http_traffic(
    src_ip, 
    dst_ip, 
    src_port=0, 
    dst_port=80, 
    include_handshake=False
):
    """
    构造 HTTP 请求和响应流量包(可选 TCP 握手)。

    参数:
    - src_ip (str): 源 IP 地址(客户端)
    - dst_ip (str): 目标 IP 地址(服务器)
    - src_port (int): 源端口,如果为 0 则随机生成
    - dst_port (int): 目标端口,如果为 0 则随机生成
    - include_handshake (bool): 是否添加 TCP 握手包(SYN, SYN-ACK, ACK)

    返回:
    - list[scapy.Packet]: 构造好的数据包列表
    """
    packets = []

    # 1. 源端口和目标端口(随机生成)
    if src_port == 0:
        src_port = random.randint(1024, 65535)
    if dst_port == 0:
        dst_port = random.randint(1024, 65535)

    # 随机生成初始 seq 和 ack
    initial_seq = random.randint(0, 10000)
    initial_ack = random.randint(0, 10000)
    src_mac = str(RandMAC())
    dst_mac = str(RandMAC())

    # 2. 如要握手包
    if include_handshake:
        # --- SYN (客户端 -> 服务器)
        syn_packet = Ether(dst=dst_mac, src=src_mac)/IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags="S", seq=initial_seq)
        packets.append(syn_packet)

        # --- SYN-ACK (服务器 -> 客户端)
        syn_ack_packet = Ether(dst=src_mac, src=dst_mac)/IP(src=dst_ip, dst=src_ip) / TCP(sport=dst_port, dport=src_port, flags="SA", seq=initial_ack, ack=initial_seq+1)
        packets.append(syn_ack_packet)

        # --- ACK (客户端 -> 服务器)
        ack_packet = Ether(dst=dst_mac, src=src_mac)/IP(src=src_ip, dst=dst_ip) / TCP(sport=src_port, dport=dst_port, flags="A", seq=initial_seq+1, ack=initial_ack+1)
        packets.append(ack_packet)


    # 3. 构造 HTTP 请求包(客户端 -> 服务器)
    http_payload = "POST /api/sql?a='and(select 1,2 union(select count(*),concat(floor(rand(0)*2),0x3a)) HTTP/1.1\r\nHost: example.com\r\nUser-Agent: Scapy-HTTP-Client\r\n\r\n"
    http_request_packet = (
        Ether(dst=dst_mac, src=src_mac)/
        IP(src=src_ip, dst=dst_ip) /
        TCP(sport=src_port, dport=dst_port, flags="PA", seq=initial_seq+1, ack=initial_ack+1) /
        Raw(load=http_payload)
    )
    packets.append(http_request_packet)

    # 更新 seq
    initial_seq += len("GET /index.html HTTP/1.1\r\nHost: example.com\r\nUser-Agent: Scapy-HTTP-Client\r\n\r\n")

    # 4. 构造 HTTP 响应包(服务器 -> 客户端)
    http_response_data = "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: 12\r\n\r\n{\"status\": 1,\"msg\": \"OK\"}"
    http_response_packet = (
        Ether(dst=src_mac, src=dst_mac)/
        IP(src=dst_ip, dst=src_ip) /
        TCP(sport=dst_port, dport=src_port, flags="PA", seq=initial_ack+1, ack=initial_seq+1) /
        Raw(load=http_response_data)
    )
    packets.append(http_response_packet)

    return packets

target_ips = ["104.164.110.12", "104.167.221.114"]

all_packets = []
for ip in target_ips:
    all_packets.extend(build_http_traffic(
        src_ip=ip,
        dst_ip="172.16.20.75",
        src_port=0,       # 随机源端口
        dst_port=0,       # 随机目标端口
        include_handshake=True
    ))

wrpcap("http.pcap", all_packets)

print(f"已成功构造{len(target_ips)}个IP的HTTP握手请求包,共{len(all_packets)}个数据包,保存到http.pcap")

687099f31ba91.png