Source code for pax_import

"""This module contains function to manipulate BIOPAX and GRAPHML files."""


__license__ = "MIT"
__docformat__ = 'reStructuredText'


import os, sys
import logging
import os.path


import json
from pprint import pprint
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import numpy as np
import graph_tool.all as gt
import pandas as pd
import traceback
 
 
from pax2graphml import utils
from pax2graphml import properties
from pax2graphml import graph_explore

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
 
[docs]def biopax_to_reaction_graph(biopax_file,graphml_file,black_list=None,control_mode=2): """Generate a reaction graph with binary interactions as a GRAPHML file from a BIOPAX file. Tte BIOPAX file is filtered, keeping only the regulation part (metabolism and genes). The process use PAXTOOLS, need a lot of memory and can be slow for big BIOPAX files: :param biopax_file: input BIOPAX file :param graphml_file: output GRAPHML reaction file :param black_list: entity black_list (e.g. hubs, h2o...) :param control_mode: control mecanism model representation, use default (2) for compressed control representation, or 1 for extended with entity duplication :return: void :rtype: None """ cls="dyliss.biopax.app.Pax2GraphML" if black_list is not None: black_listOpt=" -b %s" %(black_list) else: black_listOpt="" cmd="%s %s -cp %s %s -i '%s' -o '%s' -t '%s' -c '%s' %s" %(utils.__jvm(),utils.__jvmopt(),utils.__jarpath(),cls,biopax_file,graphml_file,utils.__template(),control_mode, black_listOpt) logger.info("command : %s" %(cmd)) return utils.__runCmd(cmd)
[docs]def biopax_merge(biopax_list,output_file="output.owl"): """Merge multiple BIOPAX (RDSF/XML) files. The process use PAXTOOLS, need a lot of memory and can be slow for big BIOPAX files: :param biopax_list: a list of input BIOPAX files to be merged :param output_file: output BIOPAX file :return: void :rtype: None """ cls="dyliss.biopax.app.Pax2Merge" biopax_filestr=",".join(biopax_list) cmd="%s %s -cp %s %s -i '%s' -o '%s'" %(utils.__jvm(),utils.__jvmopt(),utils.__jarpath(),cls,biopax_filestr,output_file) logger.info("command : %s" %(cmd)) return utils.__runCmd(cmd)
[docs]def biopax_filter(biopax_file,datasources,output_file="output.owl"): """Remove Datasources from a BIOPAX file. The process use PAXTOOLS, need a lot of memory and can be slow for big BIOPAX files: :param biopax_file: input BIOPAX files :param datasources: list of datasources to exclude :param output_file: output BIOPAX file :return: void :rtype: None """ cls="dyliss.biopax.app.BioPaxFilter" filterd=":".join(datasources) cmd="%s %s -cp %s %s %s %s %s" %(utils.__jvm(),utils.__jvmopt(),utils.__jarpath(),cls,biopax_file,filterd,output_file) logger.info("command : %s" %(cmd)) return utils.__runCmd(cmd)
[docs]def name_alias(biopax_file,output_file="entities_aliases.json",opt="--uri-ids"): """#generate a json file with annottaions extracted from a BIOPAX file. The process use PAXTOOLS : :param biopax_file: input BIOPAX files :param output_file: output json file :param opt: output generation options :return: void :rtype: None """ cls="dyliss.biopax.app.P2GAliasProcessor" cmd="%s %s -cp %s %s NAME_ALIAS '%s' '%s' %s" %(utils.__jvm(),utils.__jvmopt(),utils.__jarpath(),cls,biopax_file,output_file,opt) logger.info("command : %s" %(cmd)) return utils.__runCmd(cmd)
[docs]def annotation_dict(alias_file): """Create a dictionary from an annotation json file: :param alias_file: annotation json file :return: annotation dictionary :rtype: dict """ alias=dict() dkey="any" with open(alias_file) as jsf: ar = json.load(jsf) jsf.close() for t in ar: if t['uri'] is not None: uri=t['uri'] if t['datasource'] is not None: ds=t['datasource'] else: ds=[dkey] for px in ds: k=px.lower() +"@"+uri alias[k]=t return alias
[docs]def join_annotation(g,alias_file,annot_field, dest_field,default_val, property_type="string"): """Populate a new node property with values extracted from an annotation json file: :param properties_file: json annotation file :param annot_field: field in json data dictionary to be processed :param dest_field: new property name :param default_val: property default value for None :param property_type: new property type :return: annotation dictionary :rtype: dict """ mapd=annotation_dict(alias_file) docreate=True for p in g.vertex_properties.keys() : if p != None: if p ==dest_field: docreate=False if docreate: g.vp[dest_field]=g.new_vertex_property(property_type); uri = g.vertex_properties["uri"] provider = g.vertex_properties["provider"] nprop = g.vertex_properties[dest_field] for n in g.vertices(): pv=provider[n] ur=uri[n] if pv is None : pv="any" k=str(pv).lower()+"@"+ur if k in mapd: tupl=mapd[k] if annot_field in tupl : nprop[n]=tupl[annot_field] else: if default_val is not None: nprop[n]=default_val else: if default_val is not None: nprop[n]=default_val
############################influence graph #CREATING INFLUENCE GRAPH ##INITIATE QUANTITY NODES def __initiateQuantityNode(g): g.vp.influenceType = g.new_vertex_property("string") for v in g.vertices(): if (g.vp.entityType[v] == "reaction"): g.vp.influenceType[v]="reaction" elif (g.vp.entityType[v] == "chemical" ): g.vp.influenceType[v]="availability" qNode=g.add_vertex() #Copie des informations properties.copy_node_properties(g, v, qNode) g.vp.influenceType[qNode]="quantity" g.vp._graphml_vertex_id[qNode]=g.vp._graphml_vertex_id[v]+"Q" g.vp.color[qNode]="peru" else: g.vp.influenceType[v]="undef" return g #LINK EDGES def __linkEdges(g, displayProgress=False): reaction= gt.find_vertex(g, g.vp.influenceType, "reaction") count=0 g.ep.effect=g.new_edge_property("string") for r in reaction: if (displayProgress): logger.info(str(count)+" / "+str(len(reaction))+" edges") count+=1 source=[] target=[] catalysisBool=False for e in r.all_edges():#get all the (source, target) pairs and test if reaction is moderated or not source.append(int(e.source())) target.append(int(e.target())) if (g.ep.spaim[e] in 'aim'): catalysisBool=True #True if the reaction is moderated substrate=[] for s, t in zip(source, target): e=g.edge(s,t) ##LINK PRODUCT if (g.ep.spaim[e]=='p'): g.ep.effect[e]='1' try: logger.info("LINK PRODUCT") vid=g.vp._graphml_vertex_id vidsrc=g.vp._graphml_vertex_id[e.target()] rl= gt.find_vertex(g, vid, vidsrc+"Q" ) if len(rl)>0: rr=rl[0] sEdge = g.add_edge(r,rr ) g.ep.effect[sEdge]='1' g.ep.color[sEdge]='red' else: ed=utils.edge_description(g,e) logger.info("==>ERROR:LINK PRODUCT, no ref vertex found %s "%(ed)) except: logger.error("\nERROR IN ADDING EDGE(LINK PRODUCT) %s\n" %(traceback.format_exc() ) ) return -1 ##LINK ACTIVATOR, INHIBITOR, MODULATOR if (g.ep.spaim[e] in 'aim'): try: logger.info("LINK ACTIVATOR, INHIBITOR, MODULATOR") vid=g.vp._graphml_vertex_id vidsrc=g.vp._graphml_vertex_id[e.source()] rl= gt.find_vertex(g, vid, vidsrc+"Q" ) if len(rl)>0: rr=rl[0] #ed=utils.edge_description(g,e) #print("==>OK:LINK SUBSTRATE, ref vertex found %s "%(ed)) sEdge = g.add_edge(r,rr ) if (g.ep.spaim[e]=='a'):#activator g.ep.effect[sEdge]='1' if (g.ep.spaim[e]=='i'):#inhibitor g.ep.effect[sEdge]='-1' if (g.ep.spaim[e]=='m'):#modulator g.ep.effect[sEdge]='0' g.ep.color[sEdge]=g.ep.color[e] g.remove_edge(e) else: ed=utils.edge_description(g,e) logger.error("==>ERROR:LINK ACTIVATOR, INHIBITOR, MODULATOR, no ref vertex found %s "%(ed)) except: logger.error("\nERROR IN ADDING EDGE(LINK ACTIVATOR, INHIBITOR, MODULATOR) %s\n" %(traceback.format_exc() ) ) return -1 #### ##LINK SUBSTRATE if (g.ep.spaim[e] in 's'): g.ep.effect[e]='1' if (catalysisBool):#ie if the reaction is moderated try: logger.info("LINK SUBSTRATE") vid=g.vp._graphml_vertex_id vidsrc=g.vp._graphml_vertex_id[e.source()] rl= gt.find_vertex(g, vid, vidsrc+"Q" ) if len(rl)>0: rr=rl[0] #ed=utils.edge_description(g,e) #print("==>OK:LINK SUBSTRATE, ref vertex found %s "%(ed)) sEdge = g.add_edge(r,rr ) g.ep.effect[sEdge]='-1' g.ep.color[sEdge]='blue' else: ed=utils.edge_description(g,e) logger.info("==>ERROR:LINK SUBSTRATE, no ref vertex found %s "%(ed)) except: logger.error("\nERROR IN ADDING EDGE(LINK SUBSTRATE) %s\n" %(traceback.format_exc() ) ) return -1 else: substrate.append(e.source()) ##LINK SUBSTRATE (MODERATED REACTION) for sub in range(len(substrate)):#for each reaction that is not moderated, link by a down-regulation substrates' availabilities to other substrates' quantities substrateTamp=list(np.copy(substrate)) substrateTamp.pop(sub) for v in substrateTamp: try: logger.info("LINK SUBSTRATE (MODERATED REACTION)") vid=g.vp._graphml_vertex_id vidsrc=g.vp._graphml_vertex_id[v] rl= gt.find_vertex(g, vid, vidsrc+"Q" ) if len(rl)>0: rr=rl[0] #sEdge = g.add_edge(r,rr ) sEdge = g.edge(substrate[sub], rr, add_missing=True ) g.ep.effect[sEdge]='-1' g.ep.color[sEdge]='skyblue' else: ed=utils.edge_description(g,e) sbiopaxType=g.vp.biopaxType[e.source()] tbiopaxType=g.vp.biopaxType[e.target()] if sbiopaxType=="DnaRegion" and tbiopaxType=="ComplexAssembly": msg="WARNING:LINK SUBSTRATE(MODERATED REACTION),unmanaged case for quantity %s -> %s" %(sbiopaxType, tbiopaxType ) else: msg="ERROR:LINK SUBSTRATE(MODERATED REACTION), no ref vertex found" logger.info("==>%s %s "%(msg,ed)) except: logger.error("\nERROR IN ADDING EDGE(LINK SUBSTRATE (MODERATED REACTION) %s\n" %(traceback.format_exc() ) ) return -1 g=gt.Graph(g, prune=True) return g
[docs]def reaction_to_influence_graph(reaction_graph): """generate an influence graph from a checked reaction graph: :param reaction_graph: checked reaction graph :return: influence graph :rtype: graph object """ infG=reaction_graph.copy() __initiateQuantityNode(infG) __linkEdges(infG) return infG
[docs]def prepare_spaim(input_graph,output_graph,output_image,checkInvertP=True): """generate an checked reaction graph from a raw reaction graph: :param input_graph: input graphml file containing the raw reaction graph :param output_graph: output graphml file containing the checked reaction graph :return: void :rtype: None """ g=graph_explore.load_graphml(input_graph) g=__setEntityType(g) #for v in g.vertices(): # print(g.vp.name[v].lower()) anormalConnections=__checkConnections(g) anormalNodes=np.reshape(anormalConnections, -1) #ADHOC INCLUSION logger.info("\nAdhoc Inclusion Attempt...") for v in anormalNodes: undefAnormalButReaction=['TRANSCRIPTION', 'INHIBITION', 'PHYSICAL_STIMULATION', 'CATALYSIS', 'MODULATION', 'STATE_TRANSITION'] g.vp.biopaxType[v]='UNDEF_CHEMICAL_ENTITY' g.vp.entityType[v]="chemical" #print("hey") for bpType in undefAnormalButReaction: #print(g.vp.name[v].lower()) nm=g.vp.name[v].upper() if (bpType in nm): logger.info("-fix abnormal reaction %s " %(nm)) g.vp.biopaxType[v]=bpType g.vp.entityType[v]="reaction" if checkInvertP==True: #INVERT PRODUCT EDGES logger.info("\ncheck if need to invert product edges...") source=[] target=[] try: g.ep.color=g.new_edge_property("string") except: logger.error("\nedge color already exists") for e in g.edges(): if (g.ep.spaim[e]=='p' and g.vp.entityType[e.source()]!="reaction" and g.vp.entityType[e.target()]=="reaction" ): ed=utils.edge_description(g,e) logger.info("-inversion from %s -%s-> %s --[ %s ]--" %(e.source(),g.ep.spaim[e],e.target(), ed )) source.append(int(e.source())) target.append(int(e.target())) for s, t in zip(source, target): e=g.edge(s,t) newEdge=g.add_edge(t, s) g.ep.color[newEdge]=g.ep.color[e] g.ep.interaction[newEdge]=g.ep.interaction[e] g.ep.spaim[newEdge]=g.ep.spaim[e] g.remove_edge(e) g=gt.Graph(g, prune=True) __undefNodesAndEdges(g) graph_explore.color_nodes(g) graph_explore.color_edges(g) if output_graph is not None: graph_explore.save_graphml(g, output_graph) if output_image is not None: graph_explore.save_image(g, output_image)
[docs]def influence_subgraph(input_graph,output_graph,output_image,min_node,max_node): """Generate an influence graph as a graphml file from a checked reaction graphml file. The graph is generated from one connected component: :param input_graph: input graphml file containing the raw reaction graph :param output_graph: output graphml file containing the checked reaction graph :param output_image: output png file for graph visualization :param min_node: minimum node count of tne connected component :param max_node: maximum node count of tne connected component :return: void :rtype: None """ g=graph_explore.load_graphml(input_graph, directed=True) g=utils.cc_by_node_count(g,min_node,max_node) if g is not None: g = reaction_to_influence_graph(g) graph_explore.__saveOrPlot(g, output_graph, output_image) return g else: return None
[docs]def influence_graph(input_graph,output_graph,output_image): """ generate an influence graph as a graphml file from a checked reaction graphml file :param input_graph: input graphml file containing the raw reaction graph :param output_graph: output graphml file containing the checked reaction graph :return: void :rtype: None """ g=graph_explore.load_graphml(input_graph, directed=True) g = reaction_to_influence_graph(g) graph_explore.__saveOrPlot(g, output_graph, output_image)
def __setEntityType(g): """ set the entity type node property {REACTION, CHEMICAL} """ g.vp.entityType = g.new_vertex_property("string") for v in g.vertices(): if (g.vp.biopaxType[v] in ('undef','ComplexAssembly','Catalysis','BiochemicalReaction','Control','Interaction', 'TRANSCRIPTION', 'INHIBITION', 'PHYSICAL_STIMULATION', 'CATALYSIS', 'MODULATION', 'STATE_TRANSITION')): g.vp.entityType[v]="reaction" elif (g.vp.biopaxType[v] in ('Complex', 'Dna', 'PhysicalEntity', 'Protein', 'Rna', 'SmallMolecule','UNDEF_CHEMICAL_ENTITY')): g.vp.entityType[v]="chemical" else: g.vp.entityType[v]="undef" return g def __undefNodesAndEdges(g): """ return undef enity type nodes and undef spaim type edges """ undefTypeNodes=[] undefTypeEdges=[] for v in g.vertices(): if (g.vp.entityType[v]=='undef'): undefTypeNodes.append(v) for e in g.edges(): if (g.ep.spaim[e]=='undef'): undefTypeEdges.append((int(e.source()), int(e.target()))) logger.info("\nnumber of undef entityType nodes: "+str(len(undefTypeNodes))) logger.info("number of undef spaim edges: "+str(len(undefTypeEdges))) return (undefTypeNodes, undefTypeEdges) #CHECK IF A REACTION IS LINKED TO A REACTION OR A CHEMICAL IS LINKED TO A CHEMICAL OR A UNDEF IS LINKED TO A UNDEF def __checkConnections(g): """ Connections validation function. Check if a reaction is linked to a reaction or a chemical is linked to a chemical or an undef is linked to an undef """ anormalConnections=[] try : g.vp.entityType[0] except: logger.error("\nERROR: ENTITY TYPE PROPERTY NOT FOUND\n") return -1 for e in g.edges(): if (g.vp.entityType[e.source()]==g.vp.entityType[e.target()]): anormalConnections.append((int(e.source()), int(e.target()))) logger.info("\nnumber of anormal connections: "+str(len(anormalConnections))) return anormalConnections