如何通過人工智慧技術構建自己的UBA引擎(上)
0x00、AI在安全行業的應用前景
簡單對國內外安全公司做了一下調研,發現目前AI安全投資重點:
通過機器學習或者深度學習技術檢測惡意軟體替換傳統的基於特徵殺軟技術
提升SOC安全運營中心效率
反欺詐
百度指數
從2016年3月,人工智慧+網路安全關鍵字熱度。
google趨勢
從2017年1月,人工智慧+網路安全關鍵字熱度。
人工智慧技術興起時間在2016年3月開始,而AI公司主要集中在北京。
下面我選擇一個方向,提升SOC效率方面其中的UBA技術。
0x01、產品需求
目前基於關聯規則的SIEM產品市場越來越難做了,因為很多企業級用戶都感覺沒什麼用,其實也不是沒用,只是需要一支對黑客行為有著強大直覺的高級IT安全運維隊伍,然而沒有多少用戶有這種超能力。所以用人工智慧的方法建立一套自己的用戶行為分析引擎是解決上述高級安全人力問題的根本。
0x02、產品場景定義
對於人工智慧需要解決問題,首先確定使用場景,我們先假定一個這樣的場景:伺服器安全中的shell審計功能。
命令收集架構:通過更改的bash收集登陸堡壘機的用戶的命令,然後把數據存儲到ES,然後使用UBA引擎分析。
0x03、UBA分析引擎思路
UBA的本質是動態建立正常用戶行為基線,通過行為基線預測新執行的命令序列是不是異常行為。人工智慧UBA引擎,必須是可以通過訓練定義出用戶的正常行為。
根據上述需求,建立以下機器學習建模形式。
安全機器學習技術的尷尬
也看了市面上很多AI安全大佬的文章,但是發現一個很嚴重的問題,他們的訓練集都是很古老的數據,2000年以前的數據,我暈,今年都2018年的了,這嚴重和事實不符。所以說,要想做好AI模型提供優質的原始數據。並且做好數據標註。這裡的標註其實就是資深高級IT分析人員分析結果。
有幸在朋友的公司搭建的shell審計系統中拿到了200萬數據樣本。這些是我們分析的數據源。
0x04、UBA分析引擎演算法實現
拿到數據後,做的第一件事就是清洗數據,根據包含的命令行確定其風險指數。
(1)第一類命令:低危命令風險值:1
(2)第二類命令:中危命令風險值:2
(3)第三類命令:高危命令風險值:3
當然我們可以做更詳細的分支,例如:
@1、長期經驗總結
@2、蜜罐系統收集的命令
備註:1、數據由朋友提供,隱私原因,不能公開太多請見諒。2、當然這裡有很多情況,比如說黑客使用低危命令完成數據竊取操作,命令參數對整個操作行為危險程度的影響等,我這裡只是說明一種思維方式,具體的規則還需要從實踐中摸索。
數據清洗程序
@1、原始數據集:
2017-03-28T17:20:55compute068016,User1,100.14.7.169,"echo ""export PS1="
2017-03-28T17:21:52,s-coute003085,User2,100.14.7.169,exit
@2、清洗後數據形式:
Date,Time,clock,hostname,username,sip,cmd,risklevel,command
2017-03-28,17:20:55,17, p-compute068016,User2,100.14.7.169,echo,1,"""echo """"export PS1="""
#!/usr/bin/python2
# -*- coding: utf-8 -*-
import sys
import csv
csvfile2 = file("/Users/xxx/Desktop/xxx-2.csv", "wb")
writer = csv.writer(csvfile2)
csvfile = file("/Users/xxx/Desktop/xxx.csv","rb")
reader = csv.reader(csvfile)
cmdlist=[]
cmdlist2=[]
cmdlist3=[]
with open("cmd.txt", "r") as file_to_read:
for line in file_to_read.readlines():
cmdlist.append(line.strip())
with open("cmd2.txt", "r") as file_to_read:
for line in file_to_read.readlines():
cmdlist2.append(line.strip())
with open("cmd3.txt", "r") as file_to_read:
for line in file_to_read.readlines():
cmdlist3.append(line.strip())
for line in csvfile:
line = line.split(",")
m_ctime = line[0]
m_hostname = line[1]
m_username = line[2]
m_src_ip = line[3]
m_cmd=line[4].strip()
l = str(m_ctime).split("T")[0]
r = str(m_ctime).split("T")[1]
r2 = str(r).split(":")[0]
for w in cmdlist:
if w in m_cmd:
writer.writerow([l, r, r2, m_hostname, m_username, m_src_ip, "1",w])
for w in cmdlist2:
if w in m_cmd:
writer.writerow([l, r, r2, m_hostname, m_username, m_src_ip, "2",w])
for w in cmdlist3:
if w in m_cmd:
writer.writerow([l, r, r2, m_hostname, m_username, m_src_ip, "3",w])
if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding("utf-8")
用戶聚類程序
根據不同用戶分別聚類,根據pandas產生的編號確定用戶username。
# -*- coding: utf-8 -*-
import pandas as pd
import numpyas np
import matplotlib.pyplotas plt
import csv
from sklearn.clusterimport KMeans
userdata =pd.DataFrame(pd.read_csv("trainresult1.csv",header=0))
tmp = np.array([userdata["no"], userdata["count"]]).T
from sklearn.clusterimport KMeans
kms = KMeans(n_clusters=3)
y = kms.fit_predict(tmp)
# 將分類結果以散點圖形式展示
fig = plt.figure(figsize=(10, 6))
plt.xlabel("no")
plt.ylabel("count")
for iin range(0, len(y)):
if (y[i] == 0):
plt.plot(tmp[i, 0], tmp[i, 1], "*r")
elif(y[i] == 1):
plt.plot(tmp[i, 0], tmp[i, 1], "sy")
elif(y[i] == 2):
plt.plot(tmp[i, 0], tmp[i, 1], "pb")
plt.show()
把用戶分為3個群。
用戶KDE概率分布程序
#coding:utf-8
import numpyas np
import matplotlib.pyplotas plt
import pandas as pd
from sklearn.neighborsimport KernelDensity
userdata =pd.DataFrame(pd.read_csv("cmdlog1000000-train.csv",header=0))
tmp = userdata.username.unique()
for line in tmp:
one_userdata = userdata[userdata["username"] == line]
two_userdata = pd.DataFrame(one_userdata[["clock","risklevel"]].groupby(["clock"], as_index=False).size()[:24])
# X=two_userdata.as_matrix()
two_userdata.plot(kind="bar", color="#d62728", alpha=0.5, rot=0)
plt.xlabel(line+u"登陸時間段")
plt.ylabel(u"登陸次數")
plt.legend()
plt.show()
根據每個用戶建立不同的概率模型,一旦發現不符合以往登陸規律,視為異常。
關聯規則挖掘(Apriori algorithm)
# -*- coding: utf-8 -*-
import pandas as pd
import numpyas np
import matplotlib.pyplotas plt
from itertoolsimport combinations
from sklearn.neighborsimport KernelDensity
defget_support(df):
pp = []
for cnumin range(1, len(df.columns)+1):
for cols in combinations(df, cnum):
s = df[list(cols)].all(axis=1).sum()
pp.append([",".join(cols), s])
sdf = pd.DataFrame(pp, columns=["Pattern", "Support"])
return sdf
userdata =pd.DataFrame(pd.read_csv("xxx-train.csv",header=0))
tmp = userdata.username.unique()
for line in tmp:
one_userdata = userdata[userdata["username"] == line]
s = get_support(one_userdata)
print s[s.Support>= 3]
登陸用戶和登陸源IP關係KDE統計分析
登陸用戶和命令關係KDE統計分析
登陸用戶和登陸伺服器IP關係KDE統計分析
這些都作為統計模型,如果發現異常告警。
0x05、總結
本文主要通過傳統的統計分析模型看一下UBA應該怎麼處理數據,下一篇繼續講解持久化、工程化的方法和實踐路徑。
※滲透測試之cisco路由器在滲透中的利用
※如何從 iOS 設備中導出媒體文件
TAG:嘶吼RoarTalk |