Source code for extract

"""This module contains function to extract graph and connected components from a graph in graphml and assemble such graphs"""


__license__ = "MIT"
__docformat__ = 'reStructuredText'



import sys
import logging
import copy
import pandas as pd
import numpy as np 

from graph_tool.all import random_graph, label_components
import graph_tool.all as gt

from pybiomart import  Server, Dataset
from pybiomart.dataset import   Filter

 
from pax2graphml import utils, properties, graph_explore
 
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

[docs]def sub_graph_by_value(g,targets,annot_keys,add_void=False,void_symbol=None): """Extract a subgraph list. Each graph contains nodes with properties matching provided values: :param g: a graph :param targets: a list of values that must match the values of the properties defined by annot_keys :param annot_keys: a list of properties :param add_void: a boolean defining if we keeo void values :param void_symbol: void_symbol represents the void value symbol, used if add_void is true :return: a list of dictionnary where the key "subgraph" represents the subgraph :rtype: list """ extend_to_cc=False subgraphList=list() for tar in targets: pvset=set() if add_void==True: if void_symbol is not None and tar != void_symbol: pvset.add(void_symbol) pvset.add(tar) else: pvset.add(tar) pvlist=list(pvset) if len(pvlist)>0: add_voidCC=True sg,targmap=connected_component_by_annotation(g,pvlist,annot_keys,add_voidCC,extend_to_cc) d=dict() d["datasource"]=pvlist d["subgraph"]=sg d["ccomponent"]=targmap subgraphList.append(d) return subgraphList
[docs]def subgraphs_by_datasource(g,add_void=False,void_symbol=""): """Extract a subgraph list. Each graph contains nodes with datasource/provider matching input values: :param g: a graph :param add_void: a boolean defining if we keep void values :param void_symbol: void_symbol represents the void value symbol, used if add_void is true :return: a list of dictionnary where the key "subgraph" represents the subgraph :rtype: list """ if g is None: logger.info("warning graph is None") annot_key="provider" providerList=properties.property_values(g,annot_key) annot_keys=[annot_key] return sub_graph_by_value(g,providerList,annot_keys,add_void,void_symbol)
[docs]def connected_component_by_annotation(g,targ,annot_keys,add_void=False,extend_to_cc=True): """Extract a subgraph build by merging a subset of connected components of the original graph. Each connected component contains nodes with properties matching provided values: :param g: a graph :param targ: a list of values that must match the values of the properties defined by annot_keys :param annot_keys: a list of properties :param add_void: a boolean defining if we keeo void values.void values will have the string value "" :param extend_to_cc: if true add all members of a connectes compoenent which, \ at least one node matches a properties value defined in targ :return: a subgraph :rtype: graph """ targmap=dict() targmapcc=dict() revnidmap=dict() prop2nid=dict() selectedcc=dict() for annot_key in annot_keys: pkprop= g.vertex_properties[annot_key] nid=0 vfilt = g.new_vertex_property('bool'); for v in g.vertices(): vfilt[nid] = False xid=pkprop[nid] revnidmap[xid]=nid if xid is not None and xid !="": utils.__addArrayEl(prop2nid,xid,nid) else: if add_void==True: if xid is None: xid="" utils.__addArrayEl(prop2nid,xid,nid) nid+=1 if extend_to_cc==True: ccomp, hist = label_components(g, directed=None, attractors=False) mccid=dict() if targ!=None: for nml in targ: nm=nml ccidl=set() if nm in prop2nid.keys(): for nid in prop2nid[nm]: v= g.vertex(nid) vfilt[nid] = True if extend_to_cc==True: ccid=ccomp[v] ccidl.add(ccid) selectedcc[ccid]=1 else: nms=nml.split(";") for nm in nms: if nm in prop2nid.keys(): for nid in prop2nid[nm]: v= g.vertex(nid) vfilt[nid] = True if extend_to_cc==True: ccid=ccomp[v] ccidl.add(ccid) selectedcc[ccid]=1 targmap[nml]=list(ccidl) if extend_to_cc==True: for v in g.vertices(): nid=int(v) ccid=ccomp[v] ccidct=0 if ccid in mccid.keys(): ccidct=mccid[ccid] ccidct+=1 mccid[ccid]=ccidct if ccid in selectedcc.keys(): vfilt[nid] = True sgv = gt.GraphView(g, vfilt=vfilt) sg = gt.Graph(sgv, prune=True) return sg,targmap
[docs]def define_boolean_filter(gr,att,val, usecase=True): """Create a boolean filter property: :param gr: a graph :param att: a existing property name :param val: a value for the property att. Each node with this property value will be selected by the filter. Optionaly, val can be a list of values :param usecase:if False, when val is a string, upper or lower strings will match :return: a filter as a dictionnary (node index,True/False) :rtype: dict """ vfilter = gr.new_vertex_property("bool"); lval =list() if type(val) is list: for v in val: lval.append(v) else: lval.append(val) for v in gr.vertices(): cval=gr.vp[att][v] vindex=gr.vertex_index[v] domatch=0 for vl in lval: if cval is not None: if usecase ==False: cval=cval.lower() vl=vl.lower() if cval==vl: domatch=1 if domatch==1 : vfilter[vindex] = True else: vfilter[vindex] = False return vfilter
[docs]def filter_from_boolean_filter(gr,vfilter): """Create a subgraph using a filter: :param gr: a graph :param vfilter: a filter as a dictionnary (node index,True/False) :return: a subgraph :rtype: graph """ u = gt.GraphView(gr, vfilt=vfilter) u=gt.Graph(u, prune=True) return u
#filter a graph using a node property ( V2 with lambda)
[docs]def filter_by_node_attribute(gr,att,val): """Create a subgraph where the nodes matches a property value: :param gr: a graph :param att: an existing property name :param val: a value for the property att. Each node with this property value will be selected by the filter :return: a subgraph :rtype: graph """ u=gt.GraphView(gr, vfilt=lambda v: gr.vp[att][v]==val) u=gt.Graph(u, prune=True) if u is None: logger.info("warning graph is None") return u
[docs]def largest_connected_component(g, directed=False): """Select the largest connected component: :param g: a graph :return: a subgraph :rtype: graph """ lCCfilt = gt.label_largest_component(g, directed=directed) lCC = gt.GraphView(g, vfilt=lCCfilt) lCC=gt.Graph(lCC, prune=True) logger.info("\nlargest CC order: %s" %len(list(lCC.vertices()))) logger.info("largest CC size: %s" %len(list(lCC.edges()))) return lCC
[docs]def remove_largest_cc(g, directed=False): """Remove the largest connected component: :param g: a graph :return: a subgraph :rtype: graph """ woLCCfilt = gt.label_largest_component(g, directed=directed) for i in range(len(woLCCfilt.a)): woLCCfilt.a[i]= 1-woLCCfilt.a[i] woLCC = gt.GraphView(g, vfilt=woLCCfilt) woLCC=gt.Graph(woLCC, prune=True) logger.info("graph without largest CC order: %s" %len(list(woLCC.vertices()))) logger.info("graph without largest CC size: %s" %len(list(woLCC.edges()))) return woLCC
###########################
[docs]def merge_nodes(gr,first_node,second_node,remove_node=True): """Merge two nodes of a graph, presserving edges and properties: :param gr: a graph :param first_node: first node to ne merged :param second_node: second node to ne merged :param remove_node: if True, remove first_node :return: the modified graph :rtype: graph """ properties.copy_node_properties(gr,first_node,second_node) for e in first_node.in_edges(): ee=gr.edge(e.source(),second_node) if ee is None: newEdge=gr.add_edge(e.source(),second_node) properties.copy_edge_properties(gr, e, newEdge) for e in first_node.out_edges(): ee=gr.edge(second_node,e.target()) if ee is None: newEdge=gr.add_edge(second_node,e.target()) properties.copy_edge_properties(gr, e, newEdge) earr=list() for e in first_node.in_edges(): earr.append(e) for e in first_node.out_edges(): earr.append(e) for e in earr: gr.remove_edge(e) if remove_node==True: gr.remove_vertex( first_node)
def __merging_property(gr, properties_list,caseSensitive=True): """Define a merging property for all nodes of a graph, using a concatenation of the values of each property of a list: :param gr: a graph :param properties_list: a list of node properties :param caseSensitive: if True, the value match is case sensitive :return: a property :rtype: dict """ spropList=list() sep=":" gprop = gr.new_vertex_property("string") for sprop in properties_list: for prop in gr.vertex_properties.keys(): if sprop==prop: spropList.append(sprop) for v in gr.vertices(): gvp=list() for sprop in spropList: pv=str(gr.vp[sprop][v]) gvp.append(pv) fval=sep.join(gvp) if caseSensitive==False: fval=fval.lower() gprop[v]=fval return gprop
[docs]def merge_graph(gr1,gr2,properties_list,add_void=False,caseSensitive=True): """Merge two graphs. all nodes of both graphs, that share the same value of a list of properties are merged: :param gr1: first graph :param gr2: second graph :param properties_list: a list of node properties :param add_void: if True, nodes with void values are merged :param caseSensitive: if True, the value match is case sensitive :return: the merged graph :rtype: graph """ gprop1=__merging_property(gr1, properties_list,caseSensitive) gprop2=__merging_property(gr2, properties_list,caseSensitive) nnodeMap=dict() domergeMap=dict() sep=";" rprop = gr1.new_vertex_property("int") for u in gr1.vertices(): keyu=gprop1[u] rprop[u]=0 for v in gr2.vertices(): keyv=gprop2[v] domerge=False if keyu is None or keyv is None or keyu=="": if add_void==True and keyu==keyv: domerge=True else: kvu=keyu.split(sep) kvv=keyv.split(sep) for euv in kvu: for evv in kvv: if euv==evv: domerge=True break if int(v) not in nnodeMap: second_node=gr1.add_vertex() nnodeMap[int(v)]=second_node for prop in gr1.vertex_properties.keys(): gr1.vp[prop][second_node]=gr2.vp[prop][v] else: second_node=nnodeMap[int(v)] if domerge==True: logger.info(utils.node_to_string(gr1, second_node, " , ")) ki=int(second_node) if ki in domergeMap: kset=domergeMap[ki] else: kset=set() vu=int(u) kset.add(vu) domergeMap[ki]=kset rprop[u]=1 for v in gr2.vertices(): second_node=nnodeMap[int(v)] for e in v.in_edges(): source=nnodeMap[int(e.source())] ee=gr1.edge(source,second_node) if ee is None: newEdge=gr1.add_edge(source,second_node) properties.copy_edge_properties(gr1, e, newEdge) for e in v.out_edges(): target=nnodeMap[int(e.target())] ee=gr1.edge(second_node,target) if ee is None: newEdge=gr1.add_edge(second_node,target) properties.copy_edge_properties(gr1, e, newEdge) nmap=dict() for n in gr1.vertices(): nid=int(n) nmap[nid]=n for nid in nmap.keys(): second_node=nmap[nid] if nid in domergeMap.keys(): ulist=list(domergeMap[nid]) for ix in ulist: u=nmap[ix] merge_nodes(gr1,u,second_node,False) narr=list() for nid in nmap.keys(): u=nmap[nid] if rprop[u] is not None and rprop[u]==1: logger.info(" -->>>>"+utils.node_to_string(gr1, u, " , ")) narr.append(u) if len(narr)>0: gr1.remove_vertex(narr) return gr1
[docs]def merge_node_by_property(gr1,properties_list,add_void=False,caseSensitive=True): """Merge all nodes of a graph, that share the same value of a list of properties: :param gr: a graph :param properties_list: a list of node properties :param caseSensitive: if True, the value match is case sensitive :return: the modified graph :rtype: graph """ gprop1=__merging_property(gr1, properties_list,caseSensitive) nnodeMap=dict() domergeMap=dict() sep=";" rprop = gr1.new_vertex_property("int") for u in gr1.vertices(): keyu=gprop1[u] rprop[u]=0 for v in gr1.vertices(): keyv=gprop1[v] nnodeMap[int(v)]=v domerge=False if keyu is None or keyv is None or keyu=="": if add_void==True and keyu==keyv: domerge=True else: kvu=keyu.split(sep) kvv=keyv.split(sep) for euv in kvu: for evv in kvv: if euv==evv: domerge=True break if domerge==True: second_node=u logger.info(utils.node_to_string(gr1, second_node, " , ")) ki=int(second_node) if ki in domergeMap: kset=domergeMap[ki] else: kset=set() vv=int(v) if vv!=ki: kset.add(vv) domergeMap[ki]=kset rprop[v]=1 nmap=dict() for n in gr1.vertices(): nid=int(n) nmap[nid]=n # rmm=dict() for nid in nmap.keys(): v=nmap[nid] if nid in domergeMap.keys(): ulist=list(domergeMap[nid]) for ix in ulist: u=nmap[ix] logger.info("-v--%s "%(int(v) )) if int(u)!=int(v): st=set() st.add(str(ix)) st.add(str(nid)) k=",".join(sorted(st)) if k not in rmm.keys(): rmm[k]=1 logger.info("-v u--%s %s "%(int(v),int(u))) rprop[u]=2 merge_nodes(gr1,u,v,False) narr=list() for nid in nmap.keys(): u=nmap[nid] if rprop[u] is not None and rprop[u]==2: logger.info(" -->>>>"+utils.node_to_string(gr1, u, " , ")) narr.append(u) if len(narr)>0: gr1.remove_vertex(narr) return gr1
[docs]def sub_graph_filter(g, iteration_count, central_node, direction=all, node_limit=None, neighbour_count=None): """define a subgraph filter according to parameters: :param gr: a graph :param iteration_count: number of iterations :param central_node: selected node id :param direction: edge direction (all,in,out) :param node_limit: maximum node number :param neighbour_count: number of neighbours :return: the filter :rtype: dict """ vSubFilt=np.zeros(len(list(g.vertices()))) neighbourhood=[np.int64(central_node)] newNeighbours=neighbourhood for i in range(1, int(iteration_count)+1): newNeighboursTamp=newNeighbours newNeighbours=[] if (direction is None or str(direction)=="all"): for v in newNeighboursTamp: for n in g.get_all_neighbours(v): if (n not in neighbourhood): newNeighbours.append(n) elif (str(direction)=="in"): for v in newNeighboursTamp: for n in g.get_in_neighbours(v): if (n not in neighbourhood): newNeighbours.append(n) elif (str(direction)=="out"): for v in newNeighboursTamp: for n in g.get_out_neighbours(v): if (n not in neighbourhood): newNeighbours.append(n) else: logger.info("\nERROR: DIRECTION UNDEFINED (must be {all, in, out})\n") return -1 if (newNeighbours==[]): logger.info("\nBREAK: NO NEW NEIGHBOURS AT ITERATION %s\n" %i) break if (neighbour_count is not None):#choose randomly a given number (if not none) of neighbours newNeighbours=np.random.choice(newNeighbours, int(neighbour_count)) newNeighbours=np.unique(newNeighbours) neighbourhood=np.concatenate((neighbourhood, newNeighbours)) if (node_limit is not None and len(neighbourhood)>= int(node_limit)):#break if cardinality is over the limit logger.info("\nBREAK: CARDINALITY REACHED THE GIVEN LIMIT\n") break for l in np.int64(neighbourhood):#build filter vSubFilt[l]=1 return vSubFilt
[docs]def subgraph_by_direction(g, iteration_count, chosen_node_id=None, direction="all", node_limit=None, neighbour_count=None): """extract a subgraph according to parameters: :param gr: a graph :param iteration_count: number of iterations :param chosen_node_id: selected node id :param direction: edge direction (all,in,out) :param node_limit: maximum node number :param neighbour_count: number of neighbours :return: the subgraph :rtype: graph """ if (chosen_node_id is not None): central_node=gt.find_vertex(g, g.vp._graphml_vertex_id, chosen_node_id)[0] else: central_node=np.random.choice(g.get_vertices()) logger.info("\nCentral Node Name: "+str(g.vp.name[central_node])) logger.info("Central Node Id: "+str(g.vp._graphml_vertex_id[central_node])) logger.info("Central Node Degree: "+str(g.get_total_degrees([central_node]))) vSubFilt=sub_graph_filter(g, iteration_count, central_node, direction, node_limit, neighbour_count) subG = gt.GraphView(g, vfilt=vSubFilt) subG=gt.Graph(subG, prune=True) return subG
[docs]def subgraph_by_node(input_graph,output_graph ,nodeid,direction="in",neighbour_count=3): """extract a connected component holding a node specified by node id: :param input_graph: input graphml file :param output_graph: output graphml file :param nodeid: selected node id :param direction: edge direction (all,in,out) :param neighbour_count: number of neighbours :return: the subgraph :rtype: graph """ g=graph_explore.load_graphml(input_graph, directed=True) g = subgraph_by_direction(g, 2, chosen_node_id=nodeid, direction=direction, neighbour_count=neighbour_count) return g