Skip to content

Commit

Permalink
Merge pull request #14 from nakib103/fix_freq
Browse files Browse the repository at this point in the history
bugFix: API break when frequency not present and update according to schema
  • Loading branch information
nakib103 authored Sep 11, 2023
2 parents acf6676 + 198971d commit 63f2b39
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 84 deletions.
203 changes: 127 additions & 76 deletions common/file_model/variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,23 @@ def get_alternative_names(self) -> List:
return []

def get_primary_source(self) -> Mapping:
source = self.header.get_lines("source")[0].value
if re.search("^dbSNP", source):
source_id = "dbSNP"
source_name = "dbSNP"
source_description = "NCBI db of human variants"
source_url = "https://www.ncbi.nlm.nih.gov/snp/"
source_release =154

elif re.search("^ClinVar", source):
source_id = "ClinVar"
source_name = "ClinVar"
source_description = "ClinVar db of human variants"
source_url = "https://www.ncbi.nlm.nih.gov/clinvar/variation/"
source_release = ""

else:
try:
source = self.header.get_lines("source")[0].value
if re.search("^dbSNP", source):
source_id = "dbSNP"
source_name = "dbSNP"
source_description = "NCBI db of human variants"
source_url = "https://www.ncbi.nlm.nih.gov/snp/"
source_release =154

elif re.search("^ClinVar", source):
source_id = "ClinVar"
source_name = "ClinVar"
source_description = "ClinVar db of human variants"
source_url = "https://www.ncbi.nlm.nih.gov/clinvar/variation/"
source_release = ""

except:
source_id = "test"
source_name = "test"
source_description = "test db of human variants"
Expand Down Expand Up @@ -148,7 +149,10 @@ def get_alleles(self) -> List:
variant_allele_list = []
info_map = self.traverse_csq_info()

frequency_map = self.format_frequency(",".join(map(str,self.info["FREQ"])).split("|")) if self.info["FREQ"] else {}
frequency_map = {}
if "FREQ" in self.info:
frequency_map = self.format_frequency(",".join(map(str,self.info["FREQ"])).split("|"))

for index,alt in enumerate(self.alts):
if index+1 <= len(self.alts):
variant_allele = self.create_variant_allele(info_map, frequency_map, index+1, alt.value)
Expand All @@ -170,14 +174,15 @@ def create_variant_allele(self, info_map: Mapping, frequency_map: List, allele_i
"name": name,
"allele_sequence": alt,
"reference_sequence": self.ref,
"alternative_names": self.get_alternative_names(),
"type": "VariantAllele",
"allele_type": self.get_allele_type(alt),
"slice": self.get_slice(alt),
"phenotype_assertions": info_map[min_alt]["phenotype_assertions"] if min_alt in info_map else [],
"predicted_molecular_consequences": info_map[min_alt]["predicted_molecular_consequences"] if min_alt in info_map else [],
"prediction_results": info_map[min_alt]["prediction_results"] if min_alt in info_map else [],
"population_frequencies": self.get_population_allele_frequencies(frequency_map, allele_index)
}


def format_frequency(self, raw_frequency_list: List) -> Mapping:
freq_map = {}
Expand All @@ -192,18 +197,14 @@ def get_population_allele_frequencies(self, population_map: Mapping, allele_inde
for key, pop_list in population_map.items():
## Adding only GnomAD population
if key == "GnomAD":
if pop_list[allele_index] != "None":
allele_frequency = pop_list[allele_index]
else:
allele_frequency = None
population_allele_frequencies.append({
"population": key,
"allele_frequency": pop_list[allele_index],
"is_minor_allele": False,
"is_hpmaf": False
})
if pop_list[allele_index] not in ["None", "."]:
population_allele_frequencies.append({
"population": key,
"allele_frequency": pop_list[allele_index],
"is_minor_allele": False,
"is_hpmaf": False
})
return population_allele_frequencies


def set_frequency_flags(self, allele_list: List):
"""
Expand All @@ -219,8 +220,8 @@ def set_frequency_flags(self, allele_list: List):
highest_maf_frequency_index = -1
maf_map = {}
for allele_index, allele in enumerate(allele_list):
if(len(allele["population_frequencies"]) > 0 ):
pop = allele["population_frequencies"][0]
if(len(allele["population_frequencies"]) > 0):
pop = allele["population_frequencies"][0]
pop_allele_frequency = float(pop["allele_frequency"])
if ( pop_allele_frequency > maf_frequency and pop_allele_frequency < highest_frequency ):
maf_frequency = pop_allele_frequency
Expand All @@ -234,9 +235,6 @@ def set_frequency_flags(self, allele_list: List):
if maf_frequency>=0:
allele_list[maf_index]["population_frequencies"][0]["is_minor_allele"] = True
allele_list[maf_index]["population_frequencies"][0]["is_hpmaf"] = True




def get_info_key_index(self, key: str, info_id: str ="CSQ") -> int:
info_field = self.header.get_info_field_info(info_id).description
Expand Down Expand Up @@ -276,7 +274,6 @@ def minimise_allele(self, alt: str):
elif len(alt) < len(self.ref):
minimised_allele = "-"
return minimised_allele


def traverse_csq_info(self) -> Mapping:
"""
Expand All @@ -292,6 +289,7 @@ def traverse_csq_info(self) -> Mapping:
consequence_index = self.get_info_key_index("Consequence")
spdi_index = self.get_info_key_index("SPDI")
cadd_index = self.get_info_key_index("CADD_PHRED")
gerp_index = self.get_info_key_index("Conservation")
info_map = {}
for csq_record in self.info["CSQ"]:
csq_record_list = csq_record.split("|")
Expand All @@ -306,21 +304,31 @@ def traverse_csq_info(self) -> Mapping:
csq_record_list[feature_type_index],
csq_record_list[consequence_index],
csq_record_list[sift_index],
csq_record_list[polyphen_index],
csq_record_list[cadd_index]
csq_record_list[polyphen_index]
))
current_prediction_results = info_map[csq_record_list[allele_index]]["prediction_results"]
info_map[csq_record_list[allele_index]]["prediction_results"] += self.create_allele_prediction_results(current_prediction_results,
csq_record_list[spdi_index],
csq_record_list[cadd_index],
csq_record_list[gerp_index]
)

else:
info_map[csq_record_list[allele_index]] = {"phenotype_assertions": [], "predicted_molecular_consequences": []}
info_map[csq_record_list[allele_index]] = {"phenotype_assertions": [], "predicted_molecular_consequences": [], "prediction_results": []}
if phenotype:
info_map[csq_record_list[allele_index]]["phenotype_assertions"].append(self.create_allele_phenotype_assertion(csq_record_list[feature_index], csq_record_list[feature_type_index], phenotype))
info_map[csq_record_list[allele_index]]["predicted_molecular_consequences"].append(self.create_allele_predicted_molecular_consequence(csq_record_list[spdi_index],
csq_record_list[feature_index],
csq_record_list[feature_type_index],
csq_record_list[consequence_index],
csq_record_list[sift_index],
csq_record_list[polyphen_index],
csq_record_list[cadd_index]
csq_record_list[polyphen_index]
))
info_map[csq_record_list[allele_index]]["prediction_results"] += self.create_allele_prediction_results([], csq_record_list[spdi_index],
csq_record_list[cadd_index],
csq_record_list[gerp_index]
)

return info_map

def create_allele_phenotype_assertion(self, feature: str, feature_type: str , phenotype: str) -> Mapping:
Expand All @@ -336,11 +344,37 @@ def create_allele_phenotype_assertion(self, feature: str, feature_type: str , ph

}

def create_allele_predicted_molecular_consequence(self, allele: str, feature: str, feature_type: str, consequences: str, sift_score: str, polyphen_score: str, cadd_score: str ) -> Mapping:
def format_sift_polyphen_output(self, output: str) -> tuple:
try:
(result, score) = re.split(r"[()]", output)[:2]
except:
return (None, None)

if result not in [
'probably damaging',
'possibly damaging',
'benign',
'unknown',
'tolerated',
'deleterious',
'tolerated - low confidence',
'deleterious - low confidence',
]:
result = None

try:
score = float(score)
except:
# need to log something here
score = None

return (result, score)


def create_allele_predicted_molecular_consequence(self, allele: str, feature: str, feature_type: str, consequences: str, sift_score: str, polyphen_score: str) -> Mapping:
"""
This needs to be designed better, currently all the scores come as args
Steve suggested that we add prediction results per Gene instead of transcript
Currently, CADD returns empty
"""
consequences_list = []
for cons in consequences.split("&"):
Expand All @@ -349,36 +383,32 @@ def create_allele_predicted_molecular_consequence(self, allele: str, feature: st
"accession_id": cons
}
)
prediction_results = []

if cadd_score:
cadd_prediction_result = {
"result": cadd_score ,
prediction_results = []
if sift_score:
(result, score) = self.format_sift_polyphen_output(sift_score)
if result is not None and score is not None:
sift_prediction_result = {
"result": result,
"score": score,
"analysis_method": {
"tool": "CADD",
"qualifier": "CADD"
"tool": "SIFT",
"qualifier": "SIFT"
}

}
prediction_results.append(cadd_prediction_result)
if sift_score:
sift_prediction_result = {
"result": sift_score ,
"analysis_method": {
"tool": "SIFT",
"qualifier": "SIFT"
}
}
prediction_results.append(sift_prediction_result)
prediction_results.append(sift_prediction_result)
if polyphen_score:
polyphen_prediction_result = {
"result": polyphen_score ,
"analysis_method": {
"tool": "PolyPhen",
"qualifier": "PolyPhen"
(result, score) = self.format_sift_polyphen_output(polyphen_score)
if result is not None and score is not None:
polyphen_prediction_result = {
"result": result,
"score": score,
"analysis_method": {
"tool": "PolyPhen",
"qualifier": "PolyPhen"
}
}
}
prediction_results.append(polyphen_prediction_result)
prediction_results.append(polyphen_prediction_result)


return {
Expand All @@ -391,17 +421,38 @@ def create_allele_predicted_molecular_consequence(self, allele: str, feature: st
"prediction_results": prediction_results
}

def prediction_result_already_exists(self, current_prediction_results: Mapping, tool: str) -> bool:
for prediction_result in current_prediction_results:
if prediction_result["analysis_method"]["tool"] == tool:
return True

return False

def create_allele_prediction_results(self, current_prediction_results: Mapping, allele: str, cadd_score: str, gerp_score: str) -> list:
"""
This needs to be designed better, currently all the scores come as args
"""
prediction_results = []
if cadd_score:
if not self.prediction_result_already_exists(current_prediction_results, "CADD"):
cadd_prediction_result = {
"result": cadd_score ,
"analysis_method": {
"tool": "CADD",
"qualifier": "CADD"
}









}
prediction_results.append(cadd_prediction_result)
if gerp_score:
if not self.prediction_result_already_exists(current_prediction_results, "GERP"):
gerp_prediction_result = {
"result": gerp_score ,
"analysis_method": {
"tool": "GERP",
"qualifier": "GERP"
}
}
prediction_results.append(gerp_prediction_result)



return prediction_results
4 changes: 2 additions & 2 deletions common/schemas/variant.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ type Variant {
Variant
"""
name: String!
alternative_names: [ExternalReference]
alternative_names: [ExternalReference]!
primary_source: ExternalReference!
type: String!
allele_type: OntologyTermMetadata!
slice: Slice!
alleles: [VariantAllele]!
prediction_results: [PredictionResult]
prediction_results: [PredictionResult]!
}
11 changes: 5 additions & 6 deletions common/schemas/variant_allele.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ type VariantAllele {
name: String!
allele_sequence: String!
reference_sequence: String!
alternative_names: [ExternalReference]
alternative_names: [ExternalReference]!
type: String!
allele_type: OntologyTermMetadata!
slice: Slice!
phenotype_assertions: [PhenotypeAssertion]
prediction_results: [PredictionResult]
population_frequencies: [PopulationAlleleFrequency]
predicted_molecular_consequences: [PredictedMolecularConsequence]

phenotype_assertions: [PhenotypeAssertion]!
prediction_results: [PredictionResult]!
population_frequencies: [PopulationAlleleFrequency]!
predicted_molecular_consequences: [PredictedMolecularConsequence]!
}

0 comments on commit 63f2b39

Please sign in to comment.