Skip to content
Snippets Groups Projects
Select Git revision
  • ecd8df9c8d30a2c30e6a2880316a6693c467dce9
  • main default protected
2 results

diagramParser.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    diagramParser.py 7.33 KiB
    import pandas as pd 
    import ERFormatConstants as const
    import components as component
    import dynamics as dynamic
    import logging as log
    import matrices as matrix
    
    
    # Function will parse a csv file and extract the necessary information, this is step 1 of the parse 
    def parseDiagramFile(csvFile) -> component.Diagram: 
        df = pd.read_csv(csvFile)    
        df.drop(["Shape Library", "Page ID", "Contained By", "Group", "Comments", "property 1"], axis=1, inplace=True) #Removing unecessary data
        
        diagram = component.Diagram()   # Defines the diagram object
        
        # List containing all threats and their descriptions 
        threats = diagram.threats
        consequences = diagram.consequences
        dynamics = diagram.dynamics
        metrics = diagram.metrics
    
        threats = parseThreats(df, threats)    
        consequences = parseConsequences(df, consequences)    
        
        metrics, dynamics = parseDynamic(df, metrics, dynamics)
        
        metricsMatrix, metrics = matrix.parseTable(df, metrics, diagram) #Parse the table
        
            
        return diagram        
            
        
    def parseThreats(df, threatDict):
        for i in range(len(df)):
            if df[const.ThreatValue][i] == const.Threat: #Creates threat object
                            
                threat = component.Threat(      
                    df[const.ThreatIDValue][i],     # ID from ER
                    df[const.Id][i],                # LucidChart ID
                    df[const.ThreatSourceValue][i], 
                    df[const.ThreatDescriptionValue][i], 
                    df[const.LikelihoodValue][i], 
                    df[const.VulnerabilityValue][i]
                    )
                threatDict[df.Id[i]] = threat
                
        return threatDict
    
    #Parses consequences and adds it to dictionary 
    def parseConsequences(df, consequenceDict):
        for i in range(len(df)):
            if df[const.ConsequenceValue][i] == const.Consequence:
                            
                consequence = component.Consequence(
                    df[const.ConsequenceIDValue][i],
                    df[const.Id][i],
                    df[const.ConsequenceDescriptionValue][i],
                    df[const.ConsequenceScoreValue][i],
                    df[const.AffectedComponentValue][i]
                    )
                consequenceDict[df.Id[i]] = consequence
                
        return consequenceDict
    
    def parseAttacks(df, attackDict):
        for i in range(len(df)):
            if df[const.AttackValue][i] == const.Attack:
                            
                attack = component.Attack(
                    df[const.Id][i],
                    df[const.AttackedComponentValue][i],
                    df[const.AttackDescriptionValue][i],
                    )
                attackDict[df.Id[i]] = attack
                
        return attackDict        
            
    #Parses metrics components and adds it to list
    def parseDynamic(df, metricDict, dynamicsDict):
        for i in range(len(df)):                                            # Iterates through the dataframe
            if df[const.textArea1][i] == const.Dynamics:                    # If the component is a dynamic component
                
                if df[const.textArea3][i] == const.ThreatDynamic:           # If the dynamic component is a threat
                    threatDynamic = dynamic.BowtieDynamic(
                        df[const.Id][i],                                    # Component ID LucidChart
                        df[const.textArea3][i]                              # Name of dynamic
                    )
                    
                    threatDynamic.associateBowtie(df, threatDynamic.type, metricDict)                  # Associate the dynamic with the correct components
                    dynamicsDict[df.Id[i]] = threatDynamic
                                    
                elif df[const.textArea3][i] == const.ConsequenceDynamic:
                    consequenceDynamic = dynamic.BowtieDynamic(
                        df[const.Id][i],                                    # Component ID LucidChart
                        df[const.textArea3][i]                              # Name of dynamic
                    )
                    
                    consequenceDynamic.associateBowtie(df, consequenceDynamic.type, metricDict)    
                    dynamicsDict[df.Id[i]] = consequenceDynamic
                    
                elif df[const.textArea3][i] == const.AttackDynamic:
                    attackDynamic = dynamic.BowtieDynamic(
                        df[const.Id][i],                                    # Component ID LucidChart
                        df[const.textArea3][i]                              # Name of dynamic
                    )
                    
                    attackDynamic.associateBowtie(df,attackDynamic.type, metricDict)                     # Associate the dynamic with the correct components
                    dynamicsDict[df.Id[i]] = attackDynamic 
                                                                       
                elif df[const.textArea3][i] == const.ERDynamic:
                    erDynamic = dynamic.ERDynamic(
                        df[const.Id][i],                                        # Component ID LucidChart  
                        df[const.textArea3][i],                                 # Component type
                        df[const.textArea5][i]                                 # Description
                    )
                    
                    erDynamic.associatER(df, metricDict)
                    dynamicsDict[df.Id[i]] = erDynamic
        return metricDict, dynamicsDict
    
    
    def extractMetrics(df, index, startRange):
        for j in range(startRange, len(df.columns),2):                   # Parse all text areas to find metrics 
            listOfMetrics = []
            metricID = "Text Area "+str(j)
            metricName = "Text Area "+str(j+1)
            
            if pd.isnull(df[metricID][index]) == False:               # If the text area is not empty
                log.info("Metric: ID", df[metricID][index], "Name: ", df[metricName][index])
                metric = dynamic.Metric(df[metricID][index], df[metricName][index])
                listOfMetrics.append(metric)
            else:
                j=0
                break                                           # First empty field indicates no more metrics
            
            return listOfMetrics                                   # Returns metric found in the dynamic component
            
        
    def joinMetrcs(localMetrics: list, globalMetrics: list) -> list:    
        """_summary_
        Function will use a local metric list and insert the local metrics into a global metric list 
        containing all the metrics in the threat landscape
        :param localMetrics: List of metrics from a dynamic component
        :param globalMetrics: List of metrics from the entire threat landscape
        """
        duplicateMetrics = 0                                    # Counter for duplicate metrics per function run
        for i in range(len(globalMetrics)):
            for j in range(len(localMetrics)):
                if globalMetrics[i].name == localMetrics[j].name: # Local metric already exists in the global metric list 
                    duplicateMetrics += 1
                    break                                         # Check next entry
                else:
                    globalMetrics.append(localMetrics[j])       # Add the local metric to the global metric list
                    log.info("New local metric added to global metric list, metric: ", localMetrics[j].name)
        log.info("Added all metrics in local list \n, number of duplicate metrics: ", duplicateMetrics, "\n Number of new metrics: ", len(localMetrics)-duplicateMetrics)
        return globalMetrics