The following table contains output parameters and their descriptions:
Output Parameter Name |
Description |
---|---|
id |
Posture record ID |
instance |
Instance type/name |
policyId |
Policy ID |
controlId |
Control ID |
technologyId |
Technology ID |
status |
Posture Status |
previousStatus |
Previous Posture Status |
firstFailDate |
Posture first fail date |
lastFailDate |
Posture last fail date |
firstPassDate |
Posture pass date |
lastPassDate |
Posture last date |
postureModifiedDate |
Posture last modified date |
lastEvaluatedDate |
Posture last evaluated date |
created |
Posture creation date |
hostId |
Host ID |
ip |
Asset instance IP address |
trackingMethod |
Asset tracking method |
os |
Asset instance operating system |
osCpe |
OS Platform Enumeration |
dns |
Host ID |
qgHostid |
QualysGuard Host ID |
networkId |
Network ID |
networkName |
Network name |
complianceLastScanDate |
Policy Compliance last scan date |
customerUuid |
Customer UUID |
customerId |
Customer ID |
assetId |
Asset ID |
technology : id |
Technology ID |
technology : name |
Technology name |
criticality : label |
Control criticality label |
criticality : value |
Control criticality value |
evidence : expectedValues |
Posture evidence expected values |
evidence : currentValues |
Posture evidence current values |
causeOfFailure : missing |
Failed Posture cause of failure missing values |
causeOfFailure : unexpected |
Failed Posture unexpected value for failure result |
Following is the sample code to demonstrate how to use Qualys Policy Compliance Streaming Posture API to download host posture by using Python script.
# You need to install requests library such as PIP Install Requests.
import requests
from requests.exceptions import Timeout
import json
import datetime
import time
import sys
import zlib
# Function to handle various errors
def handlerError(size, error):
print('Total size downloaded %.2fm'%size/1048576) #Print total data downloaded in MBs
print(type(error))
print(error.args)
print(error)
# First authenticate the user to get the token needed for subsequent API calls
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
authUrl='https://gateway.<assigned URL>/auth' # data = {'username':'username', 'password':'password','token':true} # Replace username and password with actual userid and password. For token, the value can be ’true’ or ’True’.
authResp=requests.post(authUrl, data=data, headers=headers, verify=False)
token=authResp.content.decode('utf-8')
# Use the token returned by the authentication call
# Retrieve the host IDs associated with the particular policy
headers={
'accept':'application/json',
'Authorization': 'Bearer '+token}
params={'policyId':'policyid'} #Replace with the actual policy ID, pass multiple policy IDs as comma-separated list
url='https://gateway.<assigned URL>/pcrs/1.0/posture/hostids' response=requests.get(url, params=params, headers=headers, verify=False)
# check the response of host IDs API
if (response.status_code!=200):
print("Unexpected response from hostids API: ")
print(response.status_code)
exit()
# Pass the host IDs retrieved in the previous APIs to posture info API
headers={
'accept':'application/json',
'Authorization': 'Bearer '+token,
'Content-Type':'application/json'}
postureUrl = 'https://gateway.<assigned URL>/pcrs/1.0/posture/postureInfo?evidenceRequired=1&compressionRequired=1'
# If compression is used and you want to decompress the data on the fly
d = zlib.decompressobj(16+zlib.MAX_WBITS)
#with open("output.json", 'wb') as f: #If compression is used the zip file is to be stored
with open("output.json", 'w') as f: #If compression is used and decompressing on the fly or no compression used
print('API Invoked at:')
print(datetime.datetime.now())
size=0.0
try:
with requests.post(url=postureUrl, headers=headers, data=response.content, stream=True, timeout=3600, verify=False) as postureStream:
if (postureStream.status_code!=200):
print("Unexpected response from posture API: ")
print(postureStream.status_code)
exit()
print('First response received at: ')
start = time.time()
print(datetime.datetime.now())
for chunk in postureStream.iter_content(chunk_size=1048576):
if chunk:
chunk_size=len(chunk)
end=time.time()+1
outstr = d.decompress(chunk) # If compression is not used or storing zip file, please comment this line
size += len(outstr)
print('Download speed: [%.2fkbps], Chunk size: [%.2fk], total size: [%.2fm] at time %s'%((size/(end-start))/1024,
chunk_size/1024,size/1048576,datetime.datetime.now().strftime("%H:%M:%S")),end="\r")
f.write(outstr.decode())
f.flush()
postureStream.close
f.flush()
print('\nAPI finished at')
print(datetime.datetime.now())
except Exception as e:
handlerError(size. e)
except ProtocolError as pe:
handlerError(size, pe)
f.close()
# Following is a sample code to demonstrate how to use Qualys Policy Compliance Streaming Posture API for concurrent processing of host posture by using Python script.
# You need to install requests library such as PIP install requests, PIP install json_stream, and PIP install dicttoxml.
import requests
from requests.exceptions import Timeout
import json
import datetime
import time
import sys
import zlib
import threading
import json_stream
def worker():
with open("output.json", 'r') as f:
data = json_stream.load(f)
count=0
for posture in data.persistent():
print('Count [%d] Control Id [%s] IP[%s] Criticality [%s] Status[%s]'%(count,
posture['controlId'], posture['ip'], posture['criticality']['label'],posture['status']),end="\r")
count += 1
f.close()
# Function to handle various errors
def handlerError(size, error):
print(type(error))
print(error.args)
print(error)
# First authenticate the user to get the token needed for subsequent API calls
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
authUrl='https://gateway.<assigned URL>/auth' # data = {'username': <USER NAME>, 'password':<PASSWORD>,'token':true} # Replace username and password with actual user ID and password. For token, the value can be ’true’ or ’True’.
authResp=requests.post(authUrl, data=data, headers=headers, verify=False)
token=authResp.content.decode('utf-8')
# Use the token returned by the authentication call
# Retrieve the host ids associated with the particular policy
headers={
'accept':'application/json',
'Authorization': 'Bearer '+token}
params={'policyId':'xxx'} #Replace with the policyid, pass multiple policyids as comma separated list
url='https://gateway.<assigned URL>/pcrs/1.0/posture/hostids'
response=requests.get(url, params=params, headers=headers, verify=False)
# check the response of host ids API
if (response.status_code!=200):
print("Unexpected response from hostids API: ")
print(response.status_code)
exit()
# Pass the host ids retrieved in the previous APIs to posture info API
headers={
'accept':'application/json',
'Authorization': 'Bearer '+token,
'Content-Type':'application/json'}
postureUrl = 'https://gateway.<assigned URL>/pcrs/1.0/posture/postureInfo?evidenceRequired=0&compressionRequired=1'
# If compression is used and you want to decompress the data on the fly
d = zlib.decompressobj(16+zlib.MAX_WBITS)
apiTime = datetime.datetime.now()
with open("output.json", 'w') as f: #If compression is used and decompressing on the fly or no compression used
print('API Invoked at:')
print(datetime.datetime.now())
t1 = threading.Thread(target=worker, daemon=True)
try:
with requests.post(url=postureUrl, headers=headers, data=response.content, stream=True, timeout=3600, verify=False) as postureStream:
if (postureStream.status_code!=200):
print("Unexpected response from posture API: ")
print(postureStream.status_code)
exit()
print('First response received at: ')
start = time.time()
print(datetime.datetime.now())
count=0
for chunk in postureStream.iter_content(chunk_size=1048576):
if chunk:
outstr = d.decompress(chunk) # If compression is not used or storing zip file, please comment this line
f.write(outstr.decode())
f.flush()
if count == 0:
# turn-on the worker thread
t1.start()
count += 1
postureStream.close
apiTime = datetime.datetime.now()
except Exception as e:
handlerError(size. e)
except ProtocolError as pe:
handlerError(size, pe)
f.close()
t1.join()
print('\nAPI Finished at [%s] All the procesing completed at [%s]'%(apiTime,datetime.datetime.now()))