洞态IAST 钉钉自动化告警闭环

× 文章目录
  1. 1. 前言
  2. 2. 钉钉群聊机器人(这里可以使用飞书、企业微信等都可以的)
  3. 3. 洞态IAST接入钉钉机器人
  4. 4. 效果展示

前言

洞态IAST上线有一段时间了,基于被动式IAST技术,高检出率和低误报率等特点,很好的集成到devops流程中,增加我们的效率,然而每次发现漏洞后没有及时查看导致漏洞处理上的滞后,这里通过钉钉群里机器人做自动化告警,使IAST使用上闭环起来

钉钉群聊机器人(这里可以使用飞书、企业微信等都可以的)

群里机器人文档:https://developers.dingtalk.com/document/robots/custom-robot-access

介绍了如何接入、消息类型、数据格式和错误代码排查等使用上的问题

这里获取了钉钉机器人的webhook,复制下来,在安全设置中,添加关键词用来接收告警

1
2
webhook = https://oapi.dingtalk.com/robot/send?access_token=xxx
关键词:IAST

洞态IAST接入钉钉机器人

通过洞态服务架构了解,DongTai-engine 服务是用来漏洞检测的,根据调用方法数据和污点跟踪算法分析HTTP/HTTPS/RPC请求中是否存在漏洞,通过代码发现火线团队已经在代码中预留了发送漏洞告警的方法

1
DongTai-engine/signals/handlers/vul_handler.py  的 send_vul_notify()

我们接入钉钉告警机器人就放到这个方法里

这里可以看到,传入了一个vul参数,由handler_vul()调用 send_vul_notify(), 看上面的注释,vul参数的数据格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
http_url: 漏洞所在url
http_uri: 漏洞所在uri
context_path: HTTP请求上下文
http_method: HTTP请求方法
http_scheme: HTTP请求协议
http_protocol: HTTP请求协议
req_header: HTTP请求头
req_data: HTTP请求体
res_header: HTTP响应头
res_body: HTTP响应体
vul_type: 漏洞类型
vul_level: 漏洞等级
full_stack: 漏洞对应的调用链数据
top_stack: 漏洞对应污点调用链的链首
bottom_stack: 漏洞对应污点调用链的链尾
taint_value: 污点值
taint_position: 污点所在位置
agent_token: Agent的token
project: 所在的项目
counts: 漏洞出现次数
client_ip: 客户端IP
username: 漏洞所在用户的用户名

因为我们只是自己用,就不做交互处理,直接写死到代码中

钉钉机器人支持@人,这里可以根据不同业务线设置不同告警接收人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 新增业务线告警接收人
def project_vul(vul):
if vul.agent.project_name == "project_1" or vul.agent.project_name == "project_2":
phone = "13111111111"
send_vul_notify(vul, phone)

elif vul.agent.project_name == "project_3" or vul.agent.project_name == "project_4":
phone = "13222222222"
send_vul_notify(vul, phone)

# 修改发动报警方法
def send_vul_notify(vul, phone):
"""
:param vul_data:
:return:
"""
dingding = "https://oapi.dingtalk.com/robot/send?access_token=xxx"
header = {
"Content-Type": "application/json",
"Charset": "UTF-8"
}

message = {
"msgtype": "text",
"text": {
"content": "@{} 洞态IAST告警通知:\n漏洞类型:{}\n危害等级:{}\n漏洞URL:{}\n业务线名称:{}\n探针agent:{}\n请及时处理!!!".format(phone, vul.type, vul.level.name_value, vul.url, vul.agent.project_name, vul.agent.token)
},
"at": {
"atMobiles": [
"{}".format(phone)
],
"isAtAll": False
}
}

resp = requests.post(url=dingding, headers=header, data=json.dumps(message))
if resp.status_code == 200:
pass

然后修改 handler_vul() 方法

1
2
3
if vul:
# send_vul_notify(vul)
project_vul(vul)

效果展示