专栏名称: 渗透师老A
不一样的角度学习五花八门的渗透技巧,了解安全圈背后的故事
目录
相关文章推荐
51好读  ›  专栏  ›  渗透师老A

POC批量验证Python脚本编写

渗透师老A  · 公众号  ·  · 2022-03-08 11:47

正文

编写目的

批量验证poc,Python代码练习

需求分析



1、poc尽可能简单。
2、多线程。
3、联动fofa获取目标。
4、随机请求头.


实现过程



脚本分为三个模块,获取poc及目标、多线程批量请求验证、输出结果。其中批量请求验证包括构造多线程,修改请求参数,发送请求三个部分。


Main函数



在main函数中,主要有三个部分获取poc及目标,多线程(将目标填充到队列,创建多线程并启动)、输出结果。
具体实现如下:

  1. def main():

  2. # 响应Ctrl+C停止程序

  3. signal.signal(signal.SIGINT, quit)

  4. signal.signal(signal.SIGTERM, quit)

  5. showpocs()


  6. ## 获取目标

  7. targetList = getTarget()


  8. ## 多线程批量请求验证

  9. thread(targetList)


  10. ## 输出结果

  11. putTarget(List)

获取目标


关于目标来源,设计单个目标、从文件中读取多个目标以及根据FoFa语法从FOFA_API中获取目标三种方式。



定义函数getTarget,函数分为两个部分

第一部分为根据 -f Fofa语法 获取目标,默认数目为30条,

第二部分为根据 -u url / -i file / -f num(数目,默认为10) 获取要请求验证的目标,两部分以是否传参poc参数区别,最后返回一个targetList列表。


具体实现如下:

  1. def getTarget():

  2. targetList=[]

  3. count=0

  4. if result.poc==None:

  5. if result.outfile!=None and result.fofa!=None :

  6. # FOFA读取目标

  7. if result.fofa!=None:

  8. qbase=result.fofa

  9. qbase64=str(base64.b64encode(qbase.encode("utf-8")), "utf-8")

  10. print("FOFA搜索:"+qbase)

  11. fofa_url="https://fofa.so/api/v1/search/all?email="+email+"&key="+key+"&qbase64="+qbase64+"&fields=title,host,ip,port,city&size=30"

  12. try:

  13. res=requests.get (fofa_url)

  14. results = json.loads(res.text)

  15. filepath=result.outfile

  16. with open(filepath,'w') as targets:

  17. for i in results['results']:

  18. targets.write(i[1]+'\n')

  19. print(i[1])

  20. count+=1

  21. print("搜索结果有"+str(count)+"条,已保存在"+filepath+"里!")

  22. except Exception as e:

  23. print(e)

  24. sys.exit()

  25. else:

  26. if result.url!=None or result.file!=None or result.fofa!=None:

  27. # 单个目标

  28. if result.url!=None:

  29. targetList.append(result.url)

  30. # 文件读取目标

  31. if result.file!=None:

  32. try:

  33. filepath=result.file

  34. with open(filepath,'r') as targets:

  35. for target in targets.readlines():

  36. targetList.append(target.strip())

  37. except Exception as e:

  38. print(e)

  39. # FOFA读取目标

  40. if result.fofa!=None:

  41. qbase=""

  42. pocName = result.poc

  43. with open('poc.json',encoding='UTF-8') as f:

  44. data = json.load(f)

  45. for poc in data:

  46. if pocName == poc:

  47. qbase=data[poc]['fofa']

  48. qbase64=str(base64.b64encode(qbase.encode("utf-8")), "utf-8")

  49. try:

  50. fofa_url="https://fofa.so/api/v1/search/all?email="+email+"&key="+key+"&qbase64="+qbase64+"&fields=title,host,ip,port,city&size="+str(result.fofa)

  51. res=requests.get(fofa_url)

  52. results = json.loads(res.text)

  53. print("FOFA搜索:"+qbase)

  54. print("搜索结果:"+str(result.fofa)+"条")

  55. for i in results['results']:

  56. targetList.append(i[ 1])

  57. # print(targetList)

  58. except Exception as e:

  59. print(e)

  60. return targetList

  61. else :

  62. sys.exit("参错有误!缺少目标!")

批量请求验证


定义thread函数,封装多线程请求相关代码,需传入获取到的目标参数targetList。
具体实现如下:

  1. def thread(targetList):

  2. ## 获取poc

  3. poc=poc_load()


  4. ## 填充队列

  5. queueLock.acquire()

  6. for target in targetList:

  7. targetQueue.put(target)

  8. queueLock.release()


  9. ## 创建线程

  10. threadList = []

  11. threadNum=result.threadNum

  12. for i in range(0,threadNum):

  13. t=reqThread(targetQueue,poc)

  14. t.setDaemon(True)

  15. threadList.append(t)

  16. for i in threadList:

  17. i.start()


  18. # 等待所有线程完成

  19. for t in threadList:

  20. t.join()

加载POC


请求验证必须使用 -p pocName 参数指定要使用的POC,所有POC在poc.json文件中存储。

具体实现如下:

  1. # 加载poc

  2. def poc_load():

  3. if result.poc!=None:

  4. poc = result.poc

  5. isPoc = False # POC是否存在

  6. # 读取json文件

  7. with open('poc.json',encoding='UTF-8') as f:

  8. data = json.load(f)

  9. for key in data:

  10. if poc == key:

  11. isPoc=True

  12. if isPoc==False:

  13. print("POC 不存在!")

  14. sys.exit("请通过--show查看poc列表!")

  15. else:

  16. return data[poc]

  17. else:

  18. pass

多线程类


定义reqThread线程类,传入队列以及poc两个参数,封装req请求方法。
具体实现如下:

  1. class reqThread (threading.Thread):

  2. def __init__(self, q,poc):

  3. threading.Thread.__init__(self)

  4. self.q = q

  5. self.poc=poc

  6. def run(self):

  7. try:

  8. while not self.q.empty():

  9. queueLock.acquire()

  10. target=self.q.get()

  11. queueLock.release()

  12. if self.req(target):

  13. print(target+" is vuln !")

  14. List.append(target)

  15. else:

  16. pass

  17. except Exception as e:

  18. pass

  19. def req(self,url):

  20. poc=self.poc

  21. payload=urlParse(url)+poc['request']['url']

  22. res=requests.request(method=poc['request']['method'],url=payload,headers= randomheaders(poc),proxies=getProxy(),data=poc['request']['data'],verify=False,timeout=5)

  23. if res.status_code==200 and poc['request']['confirm'] in res.text:

  24. return True

  25. else:

  26. return False

其中在req中的请求方法内,存在三个修改请求的方法。

urlParse

对获取到的目标进行文本处理。

  1. # 处理url

  2. def urlParse(url):

  3. if "https://" not in url:

  4. if "http://" in url:

  5. url=url

  6. else:

  7. url="http://"+url

  8. return url

getProxy

指定请求代理。

  1. # 代理

  2. def urlParse(url):

  3. if "https://" not in url:

  4. if "http://" in url:

  5. url=url

  6. else:

  7. url="http://"+url

  8. return url

randomHeaders

添加随机User-Agent、referer、XFF等请求头参数值。

  1. def randomHeaders(poc):

  2. headers={}

  3. uaList=[

  4. 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36',

  5. 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X; zh-CN) AppleWebKit/537.51.1 (KHTML, like Gecko) Mobile/17D50 UCBrowser/12.8.2.1268 Mobile AliApp(TUnionSDK/0.1.20.3)',

  6. 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36',

  7. 'Mozilla/5.0 (Linux; Android 8.1.0; OPPO R11t Build/OPM1.171019.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/76.0.3809.89 Mobile Safari/537.36 T7/11.19 SP-engine/2.15.0 baiduboxapp/11.19.5.10 (Baidu; P1 8.1.0)',

  8. 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',

  9. 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 SP-engine/2.14.0 main%2F1.0 baiduboxapp/11.18.0.16 (Baidu; P2 13.3.1) NABar/0.0',

  10. 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/17.17134',

  11. 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36',

  12. 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_4_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.10(0x17000a21) NetType/4G Language/zh_CN' ,

  13. 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36',

  14. 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',

  15. 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',

  16. 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36',

  17. 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36',

  18. 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.106 Safari/537.36',

  19. ]

  20. refList=[

  21. "www.baidu.com"

  22. ]

  23. xffList=[

  24. '127.0.0.1',

  25. '51.77.144.148',

  26. '80.93.212.46',

  27. '109.123.115.10',

  28. '187.44.229.50',

  29. '190.14.232.58',

  30. '5.166.57.222',

  31. '36.94.142.165',

  32. '52.149.152.236',

  33. '68.15.147.8',

  34. '188.166.215.141',

  35. '190.211.82.174',

  36. '101.51.139.179'

  37. ]

  38. if 'User-Agent' in poc['request']['headers']:

  39. if poc['request']['headers']['User-Agent'].strip()!='':

  40. headers['User-Agent']=poc['request']['headers']['User-Agent']

  41. else:

  42. headers['User-Agent']=random.choice(uaList)

  43. if 'referer' in poc['request']['headers']:

  44. if poc['request']['headers']['referer'].strip()!='':

  45. headers['referer']=poc['request']['headers']['referer']

  46. else:

  47. headers['referer']=random.choice(refList)

  48. if 'X-Forwarded-For' in poc['request']['headers']:

  49. if poc['request']['headers'][ 'User-Agent'].strip()!='':

  50. headers['X-Forwarded-For']=poc['request']['headers']['X-Forwarded-For']

  51. else:

  52. headers['X-Forwarded-For']=random.choice(xffList)

  53. for key in poc['request']['headers']:

  54. if key != "referer" and key != "User-Agent" and key != "X-Forwarded-For":

  55. headers[key]=poc['request']['headers'][key]

  56. return headers

输出结果

义全 局变量Li st,储存要输出的 目标,定义输出方法putTarget。
具体实现如下:

  1. List=[]

  2. ## 输出

  3. def putTarget(resultList):

  4. if result.file!=None or result.fofa!=None:

  5. if len(resultList)!=0 :

  6. if result.outfile != None :

  7. filepath=result.outfile

  8. with open(filepath,'w') as targets:

  9. for target in resultList:

  10. targets.write(target+'\n')

  11. print("验证结果有"+str(len(resultList))+"条,已保存在"+filepath+"里!")

  12. else:

  13. print("没有发现存在漏洞的目标!")

  14. else:

  15. pass

其他

全局变量

  1. # 忽略https告警

  2. requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

  3. ## 队列

  4. targetQueue = queue.Queue(100)

  5. ## 锁

  6. queueLock = threading.Lock()

  7. # 结果

  8. List=[]

  9. # FoFA

  10. email=""

  11. key=""

命令行读取参数

  1. arg = ArgumentParser(description='POC_Verify')

  2. arg.add_argument('-u', dest='url',help='Target URL',type=str)

  3. arg.add_argument('-i', '--file', dest='file',help='Scan multiple targets given in a textual file',type=str)

  4. arg.add_argument('-f',"--fofa", dest='fofa',help='fofaquery Nums/String Example if poc -f 10 else -f "abc" default=30',default=10)

  5. arg.add_argument('-p', dest='poc',help=' Load POC file from poc.json')

  6. arg.add_argument('-proxy', dest='proxy',help='Use a proxy to connect to the target URL Example : -proxy http:127.0.0.1:8080',type=str)

  7. arg.add_argument('-t', dest ='threadNum',help='the thread_count,default=10', type=int, default=10)

  8. arg.add_argument('-show', dest='show', help='show all pocs',nargs='?',const='all',type=str)

  9. arg.add_argument('-o', '--outfile', dest='outfile', help='the file save result', default='result.txt',type=str)

  10. result = arg.parse_args()

poc详情显示

  1. ## 显示poc

  2. def showpocs():

  3. isPoc = False

  4. if result.show != None:

  5. # 读取json文件

  6. with open('poc.json',encoding='UTF-8') as f:

  7. data = json.load(f)

  8. if result.show== "all":

  9. print("pocname".ljust(20),"description".ljust(20))

  10. print("----------------------------------------------")

  11. for key in data:

  12. print(key.ljust(20),data[key]['name'].ljust(20))

  13. else:

  14. if result.show in data:

  15. print("pocname".ljust(20),"description".ljust(20))

  16. print("----------------------------------------------")

  17. print(result.show.ljust(20),data[result.show]['name'].ljust(20))

  18. sys.exit()

  19. else:

  20. pass

Ctrl+C结束线程


  1. # 停止程序

  2. def quit(signum, frame):

  3. print('You choose to stop me.')

  4. sys.exit()

  5. def main():

  6. # 响应Ctrl+C停止程序

  7. signal.signal(signal.SIGINT, quit)

  8. signal.signal(signal.SIGTERM, quit)

poc.json文件


poc本质为一次HTTP请求,本着简单的原则,仅设计名称、联动fofa的语法、请求头、请求内容、以及验证漏洞存在回显的内容5个字段。

  1. {

  2. "pocname": {

  3. "name":"漏洞描述",

  4. "fofa":"fofa搜索字符串,特殊符号需要转义",

  5. "request": {

  6. "method": "",

  7. "url":"",

  8. "headers": {

  9. "referer": "",

  10. "User-Agent": "",

  11. "X-Forwarded-For": "",







请到「今天看啥」查看全文