Select Git revision
diagramParser.py
-
martiivGylden authoredmartiivGylden authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
diagramParser.py 6.79 KiB
import pandas as pd
import ERFormatConstants as const
import components as component
import dynamics as dynamic
import logging as log
# Function will parse a csv file and extract the necessary information, this is step 1 of the parse
def parseDiagramFile(csvFile):
df = pd.read_csv(csvFile)
df.drop(["Shape Library", "Page ID", "Contained By", "Group", "Comments", "property 1"], axis=1, inplace=True) #Removing unecessary data
# List containing all threats and their descriptions
threats = {}
#threats = parseThreats(df, threats)
#List containing all consequences
consequences = {}
#consequences = parseConsequences(df, consequences)
#List containing all metrics
metrics = []
#List containing all attacks
attacks = []
parseDynamic(df, metrics)
def parseThreats(df, threatDict):
for i in range(len(df)):
if df[const.ThreatValue][i] == const.Threat: #Creates threat object
threat = component.Threat(
df[const.ThreatIDValue][i], # ID from ER
df[const.Id][i], # LucidChart ID
df[const.ThreatSourceValue][i],
df[const.ThreatDescriptionValue][i],
df[const.LikelihoodValue][i],
df[const.VulnerabilityValue][i]
)
threatDict[df.Id[i]] = threat
return threatDict
#Parses consequences and adds it to dictionary
def parseConsequences(df, consequenceDict):
for i in range(len(df)):
if df[const.ConsequenceValue][i] == const.Consequence:
consequence = component.Consequence(
df[const.ConsequenceIDValue][i],
df[const.Id][i],
df[const.ConsequenceDescriptionValue][i],
df[const.ConsequenceScoreValue][i],
df[const.AffectedComponentValue][i]
)
consequenceDict[df.Id[i]] = consequence
return consequenceDict
def parseAttacks(df, attackDict):
for i in range(len(df)):
if df[const.AttackValue][i] == const.Attack:
attack = component.Attack(
df[const.Id][i],
df[const.AttackedComponentValue][i],
df[const.AttackDescriptionValue][i],
)
attackDict[df.Id[i]] = attack
return attackDict
#Parses metrics components and adds it to list
def parseDynamic(df, metricList):
for i in range(len(df)): # Iterates through the dataframe
if df[const.textArea1][i] == const.Dynamics: # If the component is a dynamic component
if df[const.textArea3][i] == const.ThreatDynamic: # If the dynamic component is a threat
threatDynamic = dynamic.BowtieDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i] # Name of dynamic
)
threatDynamic.metrics = extractMetrics(df, i, 4) # Extracts metrics from the dynamic component
joinMetrcs(threatDynamic.metrics, metricList) # Adds the metrics to the global metric list
elif df[const.textArea3][i] == const.ConsequenceDynamic:
consequenceDynamic = dynamic.BowtieDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i] # Name of dynamic
)
consequenceDynamic.metrics = extractMetrics(df, i, 4) # Extracts metrics from the dynamic component
joinMetrcs(threatDynamic.metrics, metricList) # Adds the metrics to the global metric list
elif df[const.textArea3][i] == const.AttackDynamic:
attackDynamic = dynamic.BowtieDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i] # Name of dynamic
)
attackDynamic.metrics = extractMetrics(df, i, 4)
joinMetrcs(threatDynamic.metrics, metricList) # Adds the metrics to the global metric list
elif df[const.textArea3][i] == const.ERDynamic:
erDynamic = dynamic.ERDynamic(
df[const.id][i], # Component ID LucidChart
df.textArea3[i], # Name of dynamic
)
erDynamic.metrics = extractMetrics(df, i, 8)
joinMetrcs(threatDynamic.metrics, metricList) # Adds the metrics to the global metric list
def extractMetrics(df, index, startRange):
for j in range(startRange, len(df.columns),2): # Parse all text areas to find metrics
listOfMetrics = []
metricID = "Text Area "+str(j)
metricName = "Text Area "+str(j+1)
if pd.isnull(df[metric][index]) == False: # If the text area is not empty
print("Metric: ID", df[metricID][index], "Name: ", df[metricName][index])
metric = dynamic.Metric(df[metricID][index], df[metricName][index])
listOfMetrics.append(metric)
else:
j=0
break # First empty field indicates no more metrics
return listOfMetrics # Returns metric found in the dynamic component
"""_summary_
Function will use a local metric list and insert the local metrics into a global metric list
containing all the metrics in the threat landscape
"""
def joinMetrcs(localMetrics, globalMetrics):
duplicateMetrics = 0 # Counter for duplicate metrics per function run
for i in range(globalMetrics):
for j in range(localMetrics):
if globalMetrics[i].name == localMetrics[j].name: # Local metric already exists in the global metric list
duplicateMetrics += 1
break # Check next entry
else:
globalMetrics.append(localMetrics[j]) # Add the local metric to the global metric list
log.info("New local metric added to global metric list, metric: ", localMetrics[j].name)
log.info("Added all metrics in local list \n, number of duplicate metrics: ", duplicateMetrics, "\n Number of new metrics: ", len(localMetrics)-duplicateMetrics)