"""This module contains function to manipulate edge and node properties"""
__license__ = "MIT"
__docformat__ = 'reStructuredText'
import sys
import logging
import copy
import pandas as pd
import numpy as np
from graph_tool.all import random_graph, label_components
import graph_tool.all as gt
from pybiomart import Server, Dataset
from pybiomart.dataset import Filter
from pax2graphml import utils
from pax2graphml import extract
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
[docs]def property_values(g,annot_key):
""" Alias of node_property_values"""
return node_property_values(g,annot_key)
[docs]def node_property_values(g,annot_key):
"""Return a list of unique values corresponding to an existing node property:
:param g: a graph
:param annot_key: an existing propety name
:return: a list of node property values
:rtype: list
"""
vset=set()
pkprop= g.vertex_properties[annot_key]
nid=0
if str(pkprop.value_type())=="python::object":
for v in g.vertices():
vaf=pkprop[nid]
if isinstance(vaf, (list)) :
for va in vaf:
vset.add(va)
elif isinstance(vaf, (dict)) :
for k in vaf.keys():
vset.add(vaf[k])
else:
vaf=pkprop[nid]
vset.add(vaf)
nid+=1
else:
for v in g.vertices():
vset.add(pkprop[nid])
nid+=1
pv=list(vset)
return pv
[docs]def edge_property_values(g,annot_key):
"""Return a list of unique values corresponding to an existing edge property:
:param g: a graph
:param annot_key: an existing propety name
:return: a list of edge property values
:rtype: list
"""
vset=set()
pkprop= g.edge_properties[annot_key]
nid=0
if str(pkprop.value_type())=="python::object":
for e in g.edges():
vaf=pkprop[e]
if isinstance(vaf, (list)) :
for va in vaf:
vset.add(va)
elif isinstance(vaf, (dict)) :
for k in vaf.keys():
vset.add(vaf[k])
else:
vaf=pkprop[e]
vset.add(vaf)
nid+=1
else:
for e in g.edges():
vset.add(pkprop[e])
nid+=1
pv=list(vset)
return pv
[docs]def describe_properties(g, name=None):
"""Return a description of node and edge properties with names and types:
:param g: a graph
:param name: property name (optional). If None, all properties are described
:return: a description of edge and node properties
:rtype: string
"""
strn=""
stre=""
for k, v in g.properties.items():
nm=k[1]
cstrn=""
cstre=""
tp=''
if k[0] == 'v':
tp='node'
cstrn="%s, %s, %s \n" % (tp,nm, v.value_type())
elif k[0] == 'e':
tp='edge'
cstre="%s, %s, %s \n" % (tp,nm, v.value_type())
if name is not None and name ==nm:
return cstrn+cstre
else:
strn+=cstrn
stre+=cstre
return strn+stre
[docs]def defaultNodeValue(gr,prop,default_val ):
"""asign a userdefined value to a node property when it is None ar equal to "":
:param gr: a graph
:param prop: an existing property name
:param default_val: the value to be used to replace None and "" string
:return: void
:rtype: None
"""
pkprop= gr.vertex_properties[prop]
for v in gr.vertices():
xid=gr.vp[prop][v]
if xid is None or str(xid) == "":
gr.vp[prop][v]=default_val
[docs]def default_edge_value(gr,prop,default_val ):
"""asign a userdefined value to an edge property when it is None ar equal to "":
:param gr: a graph
:param prop: an existing property name
:param default_val: the value to be used to replace None and "" string
:return: void
:rtype: None
"""
pkprop= gr.edge_properties[prop]
for e in gr.edges():
xid=gr.ep[prop][e]
if xid is None or str(xid) == "":
gr.ep[prop][e]=default_val
[docs]def define_biomart_server(url,mart_name):
"""define a biomart server "":
:param url: the url of the biomartserver
:param mart_name: the mart name
:return: mart
:rtype: mart Object
"""
server = Server(host=url)
mart=server.marts[mart_name]
return mart
[docs]def client_annot_impl( prot,conf=None):
"""Configure a mart for Uniprot to GO annotation "":
:param prot: list of Uniprot gene symbols
:param conf: the configuration dictionary
:return: a dictionary of annotations
:rtype: dict
"""
mart=define_biomart_server(conf['server'],conf['mart'])
dataset = (mart.datasets[conf['dataset']])
attr=conf['attr']
dataset.filters
dataset._filters[conf['searchkey']]=Filter(conf['searchkey'], 'text')
filt={conf['searchkey']: prot}
unimap=dict()
for p in prot:
unimap[p] =set()
uni_key=conf['uni_key']
annot_key=conf['annot_key']
res=dataset.query(attributes=attr, filters=filt)
i=0
for index, row in res.iterrows() :
i=i+1
maxIt=1000000
if i<= maxIt:
try:
uni=str(row[uni_key])
goterm=str(row[annot_key])
except:
print("error wrong/mission uni_key /annot_key in conf. missing row response key. row is %s, conf is %s" %(row, conf))
return None
if uni != "nan":
if uni in unimap:
ma=unimap[uni]
else:
ma=set()
if goterm != "nan":
ma.add(goterm)
unimap[uni]=ma
else:
break
for k in unimap.keys():
v= unimap[k]
if v is not None and len(v)==0:
unimap[k]=None
return unimap
def __default_apî_conf():
conf={
'server':'http://www.ensembl.org',
'mart':'ENSEMBL_MART_ENSEMBL',
'dataset':'hsapiens_gene_ensembl',
'attr':[
'ensembl_gene_id',
'external_gene_name',
'uniprot_gn_symbol',
'go_id'
],
'searchkey' :'uniprot_gn_symbol',
'uni_key' : 'UniProtKB Gene Name symbol',
'annot_key' :'GO term accession'
}
return conf
def __current_apî_conf(conf=None):
confg=__default_apî_conf()
if conf is not None:
for k in conf.keys():
confg[k]=conf[k]
return confg
[docs]def uniprot_to_go(protein_list,conf=None,chunck_size=50):
"""Configure a mart for Uniprot to GO annotation "":
:param prot: list of Uniprot gene symbols
:param conf: the configuration dictionary
:param chunck_size: the size of each chunk of inputs to be submitted in one time
:return: a dictionary of annotations
:rtype: dict
"""
if conf is None:
conf=__default_apî_conf()
return ensembl_api(protein_list,conf,chunck_size)
[docs]def ensembl_api(in_list,conf=None,chunck_size=50):
"""Configure a mart for any annotation "":
:param in_list: list of inputs identifiers
:param conf: the configuration dictionary
:param chunck_size: the size of each chunk of inputs to be submitted in one time
:return: a dictionary of annotations
:rtype: dict
"""
confg=__current_apî_conf(conf)
chunks = [in_list[x:x+chunck_size] for x in range(0, len(in_list), chunck_size)]
allM=dict()
for inl in chunks:
#print(prot)
annotmap=client_annot_impl(inl,confg)
allM.update(annotmap)
return allM
[docs]def is_unique(g, key_prop, exclude_void=True):
"""Evaluate if a property contains one unique value for each node:
:param g: a graph
:param key_prop: the key node property to be evaluated
:param exclude_void: define is we include None values
:rtype: boolean
"""
ud=set()
i=0
vc=0
for node in g.vertices():
key=g.vp[key_prop][node]
if exclude_void==True and key is None :
vc+=1
else:
i+=1
ud.add(key)
if i!=len(ud):
return False
else :
return True
[docs]def annot_node_to_file(g,output_prop_file,key_prop,annot_prop,defval=None,excluded_keys=[None,''],delimiter=','):
"""Export two properties to a tabular file. The first property act as a key to identify the node, the second as an additionnal annotaion attribute. The Unicity of the key is not tested :
:param g: a graph
:param output_prop_file: the tabular output file
:param key_prop: the key node property that will be present as a named column in the file. 'index' references the node index (from 0)
:param annot_prop: the additionnal property to be exported as a named column in the file
:param defval: value to replace None values in output
:param excluded_keys: list of key_prop values that will be excluded
:param delimiter: tabular file delimiter
:rtype: void
"""
f = open(output_prop_file,"w")
endL="\n"
f.write("%s%s%s%s" % (key_prop,delimiter,annot_prop,endL))
idx=-1
for node in g.vertices():
idx+=1
if key_prop=="index":
key=str(idx)
else:
key=g.vp[key_prop][node]
annot=g.vp[annot_prop][node]
if key not in excluded_keys:
if key is None:
key='None'
if annot is None:
annot=defval
f.write( "%s%s%s%s" % (key,delimiter,annot,endL))
f.close()
[docs]def annot_edge_to_file(g,output_prop_file,key_prop,annot_prop,defval=None,excluded_keys=[None,''],delimiter=','):
"""Export two properties to a tabular file. The first property act as a key to identify the edge, the second as an additionnal annotaion attribute. The Unicity of the key is not tested :
:param g: a graph
:param output_prop_file: the tabular output file
:param key_prop: the key edge property that will be present as a named column in the file. 'index' references the edge index (from 0)
:param annot_prop: the additionnal property to be exported as a named column in the file
:param defval: value to replace None values in output
:param excluded_keys: list of key_prop values that will be excluded
:param delimiter: tabular file delimiter
:rtype: void
"""
f = open(output_prop_file,"w")
endL="\n"
f.write("%s%s%s%s" % (key_prop,delimiter,annot_prop,endL))
idx=-1
for edge in g.edges():
idx+=1
if key_prop=="index":
key=str(idx)
else:
key=g.ep[key_prop][edge]
annot=g.ep[annot_prop][edge]
if key not in excluded_keys:
if key is None:
key='None'
if annot is None:
annot=defval
f.write( "%s%s%s%s" % (key,delimiter,annot,endL))
f.close()
[docs]def annot_node_from_file(g,annot_file,map_key,new_prop,new_prop_type="string",delimiter=","):
"""Populate the nodes with a new property. The values of the property are extract from a tabular file:
:param g: a graph
:param annot_file: the tabular annotation file
:param map_key: the node property holding the primary key that must be present as a named column in the file. 'index' references the node index (from 0)
:param new_prop: the new property to be created that must be present as a named column in the file
:param new_prop_type: type of the new property ('string','int', 'float', 'long','bool')
:param delimiter: tabular file delimiter
:rtype: void
"""
df = pd.read_csv(annot_file, delimiter=delimiter)
andict=dict(df.values)
g.vp[new_prop]=g.new_vertex_property(new_prop_type)
nodes=g.vertices()
idx=-1
for node in nodes:
idx+=1
if map_key=='index':
key=idx
else:
key=g.vp[map_key][node]
if key is not None:
if key in andict.keys():
g.vp[new_prop][node]=andict[key]
[docs]def annot_edge_from_file(g,annot_file,map_key,new_prop,new_prop_type="string",delimiter=","):
"""Populate the edges with a new property. The values of the property are extract from a tabular file:
:param g: a graph
:param annot_file: the tabular annotation file
:param map_key: the node property holding the primary key that must be present as a named column in the file. 'index' references the edge index (from 0)
:param new_prop: the new property to be created that must be present as a named column in the file
:param new_prop_type: type of the new property ('string','int', 'float', 'long','bool')
:param delimiter: tabular file delimiter
:rtype: void
"""
df = pd.read_csv(annot_file, delimiter=delimiter)
andict=dict(df.values)
g.ep[new_prop]=g.new_edge_property(new_prop_type)
edges=g.edges()
idx=-1
for edge in edges:
idx+=1
if map_key=='index':
key=idx
else:
key=g.ep[map_key][edge]
if key is not None:
if key in andict.keys():
g.ep[new_prop][edge]=andict[key]
[docs]def copy_node_properties(g, sourceNode, targetNode):
"""Copy all properties of a source node to a target node:
:param g: a graph
:param sourceNode: source node
:param targetNode: target node
:rtype: void
"""
for prop in g.vertex_properties.keys():
g.vp[prop][targetNode]=g.vp[prop][sourceNode]
[docs]def copy_edge_properties(g, source_edge, target_edge):
"""Copy all properties of a source edge to a target edge:
:param g: a graph
:param source_edge: source node
:param target_edge: target node
:rtype: void
"""
for prop in g.edge_properties.keys():
g.ep[prop][target_edge]=g.ep[prop][source_edge]
[docs]def populate_shape(g,shapes=None):
"""define the node shapes:
:param g: a graph
:param color: the target property name
:param shapes: optional dictionnary of existing values (dict keys) associated with the new values (dict values)
:return: count of modified nodes
"""
#default shapes
map_values={
"0":"Ellipse",
"1":"Rectangle",
"2":"Octagon",
"3":"Triangle",
"4":"Parallelogram",
"5":"Round Rectangle",
"6": "Rectangle",
"7": "Diamond",
"8": "V"
}
if shapes is not None:
map_values=shapes
replace_property_values(g,"shape", map_values)
[docs]def populate_color(g,colors=None):
"""define the node colors:
:param g: a graph
:param color: the target property name
:param colors: optional dictionnary of existing values (dict keys) assopiated with the new values (dict values)
:return: count of modified nodes
"""
#default colors
map_values={
"0":"Red",
"1":"FireBrick",
"2":"LimeGreen",
"3":"LightGreen",
"4":"SpringGreen",
"5":"SeaGreen",
"6":"YellowGreen",
"7":"CornflowerBlue",
"8":"LightSalmon",
"9":"Coral"
}
if colors is not None:
map_values=colors
return replace_property_values(g,"color", map_values)
[docs]def replace_property_values(g,prop_name, map_values,entity_type="node"):
"""Replace the values of a property by the specified values:
:param g: a graph
:param prop_name: the target property name
:param map_values: dictionnary of existing values (dict keys) assopiated with the new values (dict values)
:param entity_type: related entity type, "node" for node, "edge" for edge
:return: count of modified entities
"""
ct=0
if entity_type == "node":
if prop_name in g.vp.keys():
for n in g.vertices():
ov=g.vp[prop_name][n]
if ov in map_values.keys():
g.vp[prop_name][n]=map_values[ov]
ct=ct+1
elif entity_type == "edge":
if prop_name in g.ep.keys():
for e in g.edges():
ov=g.ep[prop_name][e]
if ov in map_values.keys():
g.ep[prop_name][e]=map_values[ov]
ct=ct+1
return ct
[docs]def string_to_list_property(g, string_prop,new_property=None, sep=";", entity="node"):
"""Convert a string property to a property contains a list, for each node or edge:
:param g: a graph
:param string_prop: initial property
:param new_property: new property name, is None, the intial property is replaced
:param sep: string separator used in the string property
:param entity: define is the properties are related to nodes or edges
:return: void
"""
do_replace=False
if new_property is None:
do_replace=True
if entity=="node":
nprop = g.new_vertex_property("object")
for n in g.vertices():
st=g.vp[string_prop][n]
ln=st.split(sep)
nprop[n]=ln
if do_replace==True:
del g.vp[string_prop]
g.vp[string_prop] = nprop
else:
g.vp[new_property] = nprop
if entity=="edge":
nprop = g.new_edge_property("object")
for n in g.edges():
st=g.ep[string_prop][n]
ln=st.split(sep)
nprop[n]=ln
if do_replace==True:
del g.ep[string_prop]
g.ep[string_prop] = nprop
else:
g.ep[new_property] = nprop
[docs]def list_to_string_property(g, string_prop,new_property=None, sep=";", entity="node"):
"""Convert a property contains a list to a concatened string property, for each node or edge:
:param g: a graph
:param string_prop: initial property
:param new_property: new property name, is None, the intial property is replaced
:param sep: string separator used in the string property
:param entity: define is the properties are related to nodes or edges
:return: void
"""
do_replace=False
if new_property is None:
do_replace=True
if entity=="node":
nprop = g.new_vertex_property("string")
for n in g.vertices():
ln=g.vp[string_prop][n]
if len(ln)>1:
st=sep.join(ln)
else:
st=""
nprop[n]=st
if do_replace==True:
del g.vp[string_prop]
g.vp[string_prop] = nprop
else:
g.vp[new_property] = nprop
if entity=="edge":
nprop = g.new_edge_property("string")
for n in g.vertices():
ln=g.ep[string_prop][n]
if len(ln)>1:
st=sep.join(ln)
else:
st=""
nprop[n]=st
if do_replace==True:
del g.ep[string_prop]
g.ep[string_prop] = nprop
else:
g.ep[new_property] = nprop
[docs]def change_property_type(g,property_name, property_type, entity="node"):
"""change a node or edge property type :
:param g: a graph
:param property_name: the name of the property to be affected
:param property_type: the new primitive property type (string,int,bool,float,double), is None, the intial property is replaced
:param entity: define is the properties are related to nodes or edges
:return: void
"""
prop=None
if entity=="node":
prop=g.vp[property_name]
nprop = g.new_vertex_property(property_type)
for n in g.vertices():
nprop[n]=utils.__convertType(prop[n],property_type)
g.vp[property_name]=nprop
elif entity=="edge":
prop=g.ep[property_name]
nprop = g.new_edge_property(property_type)
for e in g.edges():
nprop[e]=utils.__convertType(prop[e],property_type)
g.ep[property_name]=nprop
[docs]def create_property_from_map(g, annot_map,primary_key, new_property,case_sensitive=False):
"""Create a new node property from a dictionary "":
:param g: a graph
:param annot_map: a dictionary
:param primary_key: the primary key property (e.g. uri, uniprot...)
:param new_property: the new property name. The expected type is 'object'
:param case_sensitive: define if the primary key mapping is case sensitive or not
:return: void
"""
g.vp[new_property]=g.new_vertex_property("object")
for node in g.vertices() :
pk=g.vp[primary_key][node]
if pk is not None:
for k in annot_map.keys():
if case_sensitive==False:
rk= pk.lower()
km=k.lower()
else:
rk= pk
km=k
if rk ==km:
val=annot_map[k]
if val is not None:
g.vp[new_property][node]=val
#print("%s-->%s" %(pk,annot_map[k]))
[docs]def count_edges_by_values(gr,att):
"""Count edges for each value of an input property:
:param gr: a graph
:param att: a existing property name
:return: a dictionnary (property value/count)
:rtype: dict
"""
avlist=edge_property_values(gr,att)
print(avlist)
mc=dict()
for val in avlist:
ct=0
for e in gr.edges():
cval=gr.ep[att][e]
if cval is not None and cval==val:
ct=ct+1
mc[val]=ct
return mc
[docs]def count_nodes_by_values(gr,att):
"""Count nodes for each value of an input property:
:param gr: a graph
:param att: a existing property name
:return: a dictionnary (property value/count)
:rtype: dict
"""
avlist=node_property_values(gr,att)
mc=dict()
for val in avlist:
ct=0
for n in gr.vertices():
cval=gr.vp[att][n]
if cval is not None and cval==val:
ct=ct+1
mc[val]=ct
return mc