Appendix E- Streaming Posture API Client Sample Code (Python)

The following table contains output parameters and their descriptions:

Output Parameter Name

Description

id

Posture record ID

instance

Instance type/name

policyId

Policy ID

controlId

Control ID

technologyId

Technology ID

status

Posture Status

previousStatus

Previous Posture Status

firstFailDate

Posture first fail date

lastFailDate

Posture last fail date

firstPassDate

Posture pass date

lastPassDate

Posture last date

postureModifiedDate

Posture last modified date

lastEvaluatedDate

Posture last evaluated date

created

Posture creation date

hostId

Host ID

ip

Asset instance IP address

trackingMethod

Asset tracking method

os

Asset instance operating system

osCpe

OS Platform Enumeration

dns

Host ID

qgHostid

QualysGuard Host ID

networkId

Network ID

networkName

Network name

complianceLastScanDate

Policy Compliance last scan date

customerUuid

Customer UUID

customerId

Customer ID

assetId

Asset ID

technology : id

Technology ID

technology : name

Technology name

criticality : label

Control criticality label

criticality : value

Control criticality value

evidence : expectedValues

Posture evidence expected values

evidence : currentValues

Posture evidence current values

causeOfFailure : missing

Failed Posture  cause of failure missing values

causeOfFailure : unexpected

Failed Posture  unexpected value for failure result

 

Following is the sample code to demonstrate how to use Qualys Policy Compliance Streaming Posture API to download host posture by using Python script.

# You need to install requests library such as PIP Install Requests.

import requests
from requests.exceptions import Timeout
import json
import datetime
import time
import sys
import zlib
 
# Function to handle various errors
def handlerError(size, error):
    print('Total size downloaded %.2fm'%size/1048576)                   #Print total data downloaded in MBs
    print(type(error))
    print(error.args)
    print(error)
 
# First authenticate the user to get the token needed for subsequent API calls
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
authUrl='https://gateway.<assigned URL>/auth'                 # data = {'username':'username', 'password':'password','token':true}   # Replace username and password with actual userid and password. For token, the value can be ’true’ or ’True’.
authResp=requests.post(authUrl, data=data, headers=headers, verify=False)
token=authResp.content.decode('utf-8')
 
# Use the token returned by the authentication call
# Retrieve the host IDs associated with the particular policy
headers={
    'accept':'application/json',
    'Authorization': 'Bearer '+token}
 
params={'policyId':'policyid'}                                             #Replace with the actual policy ID, pass multiple policy IDs as comma-separated list
 
url='https://gateway.<assigned URL>/pcrs/1.0/posture/hostids'  response=requests.get(url, params=params, headers=headers, verify=False)
 
# check the response of host IDs API
if (response.status_code!=200):
    print("Unexpected response from hostids API: ")
    print(response.status_code)
    exit()
 
# Pass the host IDs retrieved in the previous APIs to posture info API
headers={
    'accept':'application/json',
    'Authorization': 'Bearer '+token,
    'Content-Type':'application/json'}
 
postureUrl = 'https://gateway.<assigned URL>/pcrs/1.0/posture/postureInfo?evidenceRequired=1&compressionRequired=1' 
# If compression is used and you want to decompress the data on the fly
d = zlib.decompressobj(16+zlib.MAX_WBITS)
 
#with open("output.json", 'wb') as f: #If compression is used the zip file is to be stored
with open("output.json", 'w') as f: #If compression is used and decompressing on the fly or no compression used
    print('API Invoked at:')
    print(datetime.datetime.now())
    size=0.0
    try:
        with requests.post(url=postureUrl, headers=headers, data=response.content, stream=True, timeout=3600, verify=False) as postureStream:
            if (postureStream.status_code!=200):
                print("Unexpected response from posture API: ")
                print(postureStream.status_code)
                exit()
            print('First response received at: ')
            start = time.time()
            print(datetime.datetime.now())
            for chunk in postureStream.iter_content(chunk_size=1048576):
                if chunk:
                    chunk_size=len(chunk)
                    end=time.time()+1
                    outstr = d.decompress(chunk) # If compression is not used or storing zip file, please comment this line
                    size += len(outstr)
                    print('Download speed: [%.2fkbps], Chunk size: [%.2fk], total size: [%.2fm] at time %s'%((size/(end-start))/1024,
                                            chunk_size/1024,size/1048576,datetime.datetime.now().strftime("%H:%M:%S")),end="\r")
                    f.write(outstr.decode())
                    f.flush()
            postureStream.close
        f.flush()
        print('\nAPI finished at')
        print(datetime.datetime.now())
    except Exception as e:
        handlerError(size. e)
    except ProtocolError as pe:
        handlerError(size, pe)
f.close()

# Following is a sample code to demonstrate how to use Qualys Policy Compliance Streaming Posture API for concurrent processing of host posture by using Python script. 
# You need to install requests library such as PIP install requests, PIP install json_stream, and PIP install dicttoxml.
import requests
from requests.exceptions import Timeout
import json
import datetime
import time
import sys
import zlib
import threading
import json_stream

def worker():
    with open("output.json", 'r') as f:
        data = json_stream.load(f)
        count=0
        for posture in data.persistent():
            print('Count [%d] Control Id [%s] IP[%s] Criticality [%s] Status[%s]'%(count, 
                                    posture['controlId'], posture['ip'], posture['criticality']['label'],posture['status']),end="\r")
            count += 1 
            
    f.close()

 

# Function to handle various errors
def handlerError(size, error):
    print(type(error))
    print(error.args)
    print(error)

# First authenticate the user to get the token needed for subsequent API calls
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
authUrl='https://gateway.<assigned URL>/auth'                 # data = {'username': <USER NAME>, 'password':<PASSWORD>,'token':true}   # Replace username and password with actual user ID and password. For token, the value can be ’true’ or ’True’.
authResp=requests.post(authUrl, data=data, headers=headers, verify=False)
token=authResp.content.decode('utf-8')

# Use the token returned by the authentication call
# Retrieve the host ids associated with the particular policy
headers={
    'accept':'application/json',
    'Authorization': 'Bearer '+token}

params={'policyId':'xxx'}                                             #Replace with the policyid, pass multiple policyids as comma separated list

url='https://gateway.<assigned URL>/pcrs/1.0/posture/hostids'
response=requests.get(url, params=params, headers=headers, verify=False)

# check the response of host ids API
if (response.status_code!=200):
    print("Unexpected response from hostids API: ")
    print(response.status_code)
    exit()

# Pass the host ids retrieved in the previous APIs to posture info API
headers={
    'accept':'application/json',
    'Authorization': 'Bearer '+token,
    'Content-Type':'application/json'}

postureUrl = 'https://gateway.<assigned URL>/pcrs/1.0/posture/postureInfo?evidenceRequired=0&compressionRequired=1' 
# If compression is used and you want to decompress the data on the fly
d = zlib.decompressobj(16+zlib.MAX_WBITS)

apiTime = datetime.datetime.now()
with open("output.json", 'w') as f: #If compression is used and decompressing on the fly or no compression used
    print('API Invoked at:')
    print(datetime.datetime.now())
    t1 = threading.Thread(target=worker, daemon=True)
    try:
        with requests.post(url=postureUrl, headers=headers, data=response.content, stream=True, timeout=3600, verify=False) as postureStream:
            if (postureStream.status_code!=200):
                print("Unexpected response from posture API: ")
                print(postureStream.status_code)
                exit()
            print('First response received at: ')
            start = time.time()
            print(datetime.datetime.now())
            
            count=0
            for chunk in postureStream.iter_content(chunk_size=1048576):
                if chunk:
                    outstr = d.decompress(chunk) # If compression is not used or storing zip file, please comment this line
                    f.write(outstr.decode())
                    f.flush()
                    if count == 0:
                        # turn-on the worker thread
                        t1.start()
                        count += 1
            
            postureStream.close
        apiTime = datetime.datetime.now()
    except Exception as e:
        handlerError(size. e)
    except ProtocolError as pe:
        handlerError(size, pe)
f.close()
t1.join()
print('\nAPI Finished at [%s] All the procesing completed at [%s]'%(apiTime,datetime.datetime.now()))