1. 概述

Splunk 是一款强大的机器数据分析平台,被誉为"数据界的谷歌"。它能够对任何机器生成的数据(如日志、性能指标等)进行实时收集、索引、分析和可视化。在安全领域,Splunk 更是安全运营中心(SOC)的核心平台,帮助企业实现安全监控、威胁检测和事件响应。

在很多中、大客户安全建设里,会使用 Splunk 作为 SOC 运营,所以我这里简单记录总结一下,便于后续遇到这个产品能够快速的回顾和上手。

2. 基础架构

直接看架构图,便于我们直观理解。这里不讨论单体架构,在企业场景里,一般都是分布式部署,如下图。

C29ACCEE-23C4-4114-835D-BA9F73874069.png

其实主要就包含几部分:

  • 转发器(Forwarder):轻量级数据收集节点,主要作用就是用来接收数据,分为通用、重型、轻型;可以通过本地文件、TCP、UDP、HTTP 等多种方式接收数据。

  • 索引器(Indexer):存储节点,主要作用就是用来存储数据,搜索头的数据集。

  • 搜索头(Search Head):提供用户界面和搜索能力。

  • 其他:简单了解,控制头、授权头等。

实际上,基本每个节点都可以独立使用,只是扮演的角色不同,资源配置不同。比如你可以直接在索引节点上开一个数据接收,直接接收数据,也可以在索引节点上直接进行搜索。

所以,上图就很容易理解了。具体的数据日志如安全设备 WAF 日志、Windows 主机日志、防火墙日志等直接通过 syslog 日志推送或 纯 TCP/UDP 发送到转发器,随后转发器将数据转发到索引器进行存储 ,最后通过搜索头进行数据搜索。

3. 搜索使用

36E80916-23D3-412F-9CC0-BE2384655348.png
76F659B2-2A86-4FE4-8446-3B5F0256A69E.png

在应急使用场景中,我们最好能够熟练的应用搜索技巧,我简单总结如下:

  • 关键字搜索,可以直接使用通配符*进行搜索,如果存在空格可以使用引号包裹。

  • 运算连接符:AND,NOT、NOT、 管道符 |,注意需要大写。

  • 转义:遇到特殊字符记得转义。

  • 关键语法:stat、table、rename、rex、regex,append 等

在使用前需要,需要提前了解几个主要字段。

  • index:索引,比如上面创建的 waf_log。

  • host:主要就是记录是哪个节点发送的日志,比如是安全设备发送过来的,就记录安全设备发送过来的 IP,可以直接锁定设备。

  • source:数据接入,哪个方式接收的,如上面开放的 TCP:8081。

0BA72E13-B3DF-490A-996D-A8A20941D832.png

搜索示例:

1. 简单关键字搜索,使用了引号和转义。

198C06B0-954F-4613-8D1F-CB853983AE50.png

2. 进一步使用运算连接符,加入 NOT、AND(空格)进行搜索,缩小搜索范围。

EE9F4C88-B1CC-4F6F-AE92-05C5FAF54744.png

3. 临时提取字段src_ip, level, url,以便用于后续分析。

可能是因为免费版本的原因,导致 json 格式的字段没有直接识别成字段。实际在复杂日志格式时,可以为索引配置字段提取规则进行字段提取,便于直接使用。

5BD3D74F-47CA-4A7A-9E36-08E6EE24A1D4.png

4. 使用统计语法,提取完字段后,直接展示为表格形式,可以更直观的查看。

A2892BE6-0069-4041-B0C9-772816BA62B9.png

5. 进一步统计,使用 stats count 根据字段统计出现的次数。

203FB0BB-26FB-4C38-BE30-F18D65D4888C.png

6. 再进一步进行统计, 加入了字段重命名,排序。

80229C91-EC58-4A17-9E78-8AB503A2305E.png

4. 其他功能

4.1 告警

客户运营群里的告警是怎么产生的?其实就是可以设置定时任务进行搜索,就是基于搜索结果筛选关键的字段如table 语法通过电子邮件、webhook 等方式直接进行结果推送输出。

然后通过中间处理程序/服务将结果变成飞书、企微等特殊的消息通知。

D8DA0E0C-9B78-402B-81E2-457AF4C5563A.png

4.2 数据输入

数据输入就是开放端口来接收日志,指定存储的索引及相关配置。以目前数据为例,这边创建了 3 个索引,用于存储数据。

D918ECDE-4101-464C-B61E-36598C552205.png

直接创建了 3 个 TCP 端口接收数据输入,分别对应上面的 3 个索引。

349094AD-5833-4FB0-BFC8-64F1678E6997.png

5. 总结

实际 Splunk 功能很强大,还有很多功能点未研究,但在应急场景里,只要熟练善用搜索能力即可。因时间有限,后续再进行补充。

附录

附 1-安装使用

docker pull splunk:splunk:latest

docker run -d -p 8000:8000 -p 9997:9997 -e SPLUNK_LICENSE_URI=Free -e "SPLUNK_GENERAL_TERMS=--accept-sgt-current-at-splunk-com" -e "SPLUNK_START_ARGS=--accept-license" -e "SPLUNK_PASSWORD=Wwg@1234" --name splunk splunk:latest

附 2-数据生成

这里写了个两个简单脚本生成样例数据便。

模拟安全设备日志脚本,逐行生成 json 格式日志:

import json
import random
import time
import sys
from datetime import datetime

class WAFLogGenerator:
    def __init__(self):
        # 固定的IP地址池,便于分析
        self.fixed_ips = ["192.168.1.100", "10.10.1.50", "172.16.8.123", "203.0.113.45", "198.51.100.77"]
        self.attack_types = ["sqli", "xss", "cmdi", "lfi", "rfi", "webshell"]
        self.actions = ["block", "alert", "pass"]
        self.levels = ["low", "medium", "high", "critical"]
        self.methods = ["GET", "POST", "PUT", "DELETE"]
        
    def generate_waf_log(self):
        """生成单条WAF日志(JSON格式)"""
        log_entry = {
            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "src_ip": random.choice(self.fixed_ips),
            "attack_type": random.choice(self.attack_types),
            "action": random.choice(self.actions),
            "level": random.choice(self.levels),
            "method": random.choice(self.methods),
            "url": f"/{random.choice(['admin', 'api', 'login', 'search'])}.php",
            "user_agent": random.choice([
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
                "python-requests/2.28.1",
                "curl/7.68.0"
            ]),
            "rule_id": f"rule_{random.randint(10000, 99999)}"
        }
        return json.dumps(log_entry, ensure_ascii=False)

def main():
    if len(sys.argv) != 2:
        print("用法: python waf_generator.py <日志条数>")
        sys.exit(1)
    
    try:
        num_logs = int(sys.argv[1])
    except ValueError:
        print("错误: 参数必须是数字")
        sys.exit(1)
    
    generator = WAFLogGenerator()
    filename = f"waf_logs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    
    print(f"开始生成 {num_logs} 条WAF日志到文件: {filename}")
    
    with open(filename, 'w', encoding='utf-8') as f:
        for i in range(num_logs):
            log_line = generator.generate_waf_log()
            f.write(log_line + '\n')  # 写入文件,每条日志一行
            
            # 在控制台显示进度
            if (i + 1) % 10 == 0:
                print(f"已生成 {i + 1}/{num_logs} 条日志")
            
            # 添加短暂延迟使时间戳有差异
            time.sleep(0.01)
    
    print(f"WAF日志生成完成,共 {num_logs} 条,保存至: {filename}")

if __name__ == "__main__":
    main()

数据发送脚本,主要功能就是逐行读取文件内容,发送至 Splunk 接收端口:

import socket
import sys
import time
import os
from datetime import datetime

# 目标服务器配置
HOST = '14.103.250.212'
PORT = 8082

if len(sys.argv) < 2:
    print("使用方法: python tcp_send.py <日志文件路径>")
    print("示例: python tcp_send.py access.log")
    sys.exit(1)

log_file = sys.argv[1]

if not os.path.exists(log_file) or not os.path.isfile(log_file):
    print(f"✗ 错误: 文件不存在: {log_file}")
    sys.exit(1)

print("=" * 60)
print(f"TCP 日志文件发送工具")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)

print(f"\n[0/4] 准备读取日志文件...")
print(f"    文件路径: {os.path.abspath(log_file)}")
print(f"    文件大小: {os.path.getsize(log_file):,} 字节")

line_count = 0
with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
    for _ in f:
        line_count += 1

print(f"    总行数: {line_count:,} 行")

try:
    # 创建socket并连接
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(10)  # 设置10秒超时
    
    s.connect((HOST, PORT))
    
    local_addr, local_port = s.getsockname()
    remote_addr, remote_port = s.getpeername()
    
    print(f"    ✓ 连接成功!")
    print(f"    本地地址: {local_addr}:{local_port}")
    print(f"    远程地址: {remote_addr}:{remote_port}")
    
    total_bytes_sent = 0
    success_count = 0
    fail_count = 0
    
    with open(log_file, 'r', encoding='utf-8', errors='ignore') as f:
        for line_num, line in enumerate(f, 1):
            line = line.rstrip('\n\r')
            processed_line = line.replace('[', '').replace(']', '')
            if not processed_line.strip():
                continue
            
            try:
                data_to_send = (processed_line + '\n').encode('utf-8')
                bytes_sent = s.sendall(data_to_send)
                
                if bytes_sent is None:
                    total_bytes_sent += len(data_to_send)
                    success_count += 1
                    
                    if line_num % 100 == 0 or line_num == line_count:
                        progress = (line_num / line_count) * 100
                        print(f"    进度: {line_num}/{line_count} ({progress:.1f}%) - 成功: {success_count}, 失败: {fail_count}")
                else:
                    total_bytes_sent += bytes_sent if bytes_sent else 0
                    success_count += 1
                    
            except Exception as e:
                fail_count += 1
                print(f"    ✗ 第 {line_num} 行发送失败: {e}")
    
    print(f"\n    发送完成!")
    print(f"    成功发送: {success_count:,} 行")
    if fail_count > 0:
        print(f"    发送失败: {fail_count:,} 行")
    print(f"    总字节数: {total_bytes_sent:,} 字节")
    
    s.shutdown(socket.SHUT_WR)
    time.sleep(0.5)
    s.close()
    print(f"    ✓ 连接已完全关闭")
    
    print("\n" + "=" * 60)
    print("✓ 全部操作完成!")
    print("=" * 60)
    
except Exception as e:
    print(f"\n✗ 发送失败: {type(e).__name__}: {e}")
    sys.exit(1)

参考链接: