Select Git revision
diagnostics.go
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
diagramParser.py 7.33 KiB
import pandas as pd
import ERFormatConstants as const
import components as component
import dynamics as dynamic
import logging as log
import matrices as matrix
# Function will parse a csv file and extract the necessary information, this is step 1 of the parse
def parseDiagramFile(csvFile) -> component.Diagram:
df = pd.read_csv(csvFile)
df.drop(["Shape Library", "Page ID", "Contained By", "Group", "Comments", "property 1"], axis=1, inplace=True) #Removing unecessary data
diagram = component.Diagram() # Defines the diagram object
# List containing all threats and their descriptions
threats = diagram.threats
consequences = diagram.consequences
dynamics = diagram.dynamics
metrics = diagram.metrics
threats = parseThreats(df, threats)
consequences = parseConsequences(df, consequences)
metrics, dynamics = parseDynamic(df, metrics, dynamics)
metricsMatrix, metrics = matrix.parseTable(df, metrics, diagram) #Parse the table
return diagram
def parseThreats(df, threatDict):
for i in range(len(df)):
if df[const.ThreatValue][i] == const.Threat: #Creates threat object
threat = component.Threat(
df[const.ThreatIDValue][i], # ID from ER
df[const.Id][i], # LucidChart ID
df[const.ThreatSourceValue][i],
df[const.ThreatDescriptionValue][i],
df[const.LikelihoodValue][i],
df[const.VulnerabilityValue][i]
)
threatDict[df.Id[i]] = threat
return threatDict
#Parses consequences and adds it to dictionary
def parseConsequences(df, consequenceDict):
for i in range(len(df)):
if df[const.ConsequenceValue][i] == const.Consequence:
consequence = component.Consequence(
df[const.ConsequenceIDValue][i],
df[const.Id][i],
df[const.ConsequenceDescriptionValue][i],
df[const.ConsequenceScoreValue][i],
df[const.AffectedComponentValue][i]
)
consequenceDict[df.Id[i]] = consequence
return consequenceDict
def parseAttacks(df, attackDict):
for i in range(len(df)):
if df[const.AttackValue][i] == const.Attack:
attack = component.Attack(
df[const.Id][i],
df[const.AttackedComponentValue][i],
df[const.AttackDescriptionValue][i],
)
attackDict[df.Id[i]] = attack
return attackDict
#Parses metrics components and adds it to list
def parseDynamic(df, metricDict, dynamicsDict):
for i in range(len(df)): # Iterates through the dataframe
if df[const.textArea1][i] == const.Dynamics: # If the component is a dynamic component
if df[const.textArea3][i] == const.ThreatDynamic: # If the dynamic component is a threat
threatDynamic = dynamic.BowtieDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i] # Name of dynamic
)
threatDynamic.associateBowtie(df, threatDynamic.type, metricDict) # Associate the dynamic with the correct components
dynamicsDict[df.Id[i]] = threatDynamic
elif df[const.textArea3][i] == const.ConsequenceDynamic:
consequenceDynamic = dynamic.BowtieDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i] # Name of dynamic
)
consequenceDynamic.associateBowtie(df, consequenceDynamic.type, metricDict)
dynamicsDict[df.Id[i]] = consequenceDynamic
elif df[const.textArea3][i] == const.AttackDynamic:
attackDynamic = dynamic.BowtieDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i] # Name of dynamic
)
attackDynamic.associateBowtie(df,attackDynamic.type, metricDict) # Associate the dynamic with the correct components
dynamicsDict[df.Id[i]] = attackDynamic
elif df[const.textArea3][i] == const.ERDynamic:
erDynamic = dynamic.ERDynamic(
df[const.Id][i], # Component ID LucidChart
df[const.textArea3][i], # Component type
df[const.textArea5][i] # Description
)
erDynamic.associatER(df, metricDict)
dynamicsDict[df.Id[i]] = erDynamic
return metricDict, dynamicsDict
def extractMetrics(df, index, startRange):
for j in range(startRange, len(df.columns),2): # Parse all text areas to find metrics
listOfMetrics = []
metricID = "Text Area "+str(j)
metricName = "Text Area "+str(j+1)
if pd.isnull(df[metricID][index]) == False: # If the text area is not empty
log.info("Metric: ID", df[metricID][index], "Name: ", df[metricName][index])
metric = dynamic.Metric(df[metricID][index], df[metricName][index])
listOfMetrics.append(metric)
else:
j=0
break # First empty field indicates no more metrics
return listOfMetrics # Returns metric found in the dynamic component
def joinMetrcs(localMetrics: list, globalMetrics: list) -> list:
"""_summary_
Function will use a local metric list and insert the local metrics into a global metric list
containing all the metrics in the threat landscape
:param localMetrics: List of metrics from a dynamic component
:param globalMetrics: List of metrics from the entire threat landscape
"""
duplicateMetrics = 0 # Counter for duplicate metrics per function run
for i in range(len(globalMetrics)):
for j in range(len(localMetrics)):
if globalMetrics[i].name == localMetrics[j].name: # Local metric already exists in the global metric list
duplicateMetrics += 1
break # Check next entry
else:
globalMetrics.append(localMetrics[j]) # Add the local metric to the global metric list
log.info("New local metric added to global metric list, metric: ", localMetrics[j].name)
log.info("Added all metrics in local list \n, number of duplicate metrics: ", duplicateMetrics, "\n Number of new metrics: ", len(localMetrics)-duplicateMetrics)
return globalMetrics