Page MenuHomec4science

cluster_collection.py
No OneTemporary

File Metadata

Created
Mon, Jul 14, 01:28

cluster_collection.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
" todo:: documentation for module containing class ClusterCollection"
##########################################################################
import math
from math import log10
from metadata_reader import metadataReader
##########################################################################
class ClusterCollection(object):
"""
class Cluster: Documentation TODO
"""
# ------------------------------------------------------------------ #
# Constructors/Destructors #
# ------------------------------------------------------------------ #
def __init__(self):
"""__init__: a cluster collection as a dictionnary of clusters as member, the keys are the clusters cluster_head_id """
# Members ---------------------- #
# dictionnary cluster_collection
self.cluster_collection = {}
def __del__(self):
"""__del__: not implemented """
pass
def addCluster(self,new_cluster):
""" addCluster : add a new cluster to the cluster_collection """
if new_cluster.head_id == 'None' :
print 'Warning : cluster do not have a cluster head ! Not added.'
else :
self.cluster_collection[new_cluster.head_id] = new_cluster
return 0
def getCluster(self,cluster_head_id):
""" getCluster : returns the cluster corresponding to the cluster_head_id given in argument """
return self.cluster_collection[cluster_head_id]
def modifyClusterHead(self,previous_cluster_head_id, new_cluster_head):
""" modifyClusterHead : given the previous id and a sequence object (new_cluster_head), set this sequence as cluster head in the cluster corresponding to the previous cluster head id.
"""
# get the cluster corresponding to previous_cluster_head_id
current_cluster = self.cluster_collection[previous_cluster_head_id]
# replace the cluster head of this cluster
current_cluster.head_id = new_cluster_head.seqID
current_cluster.seq = new_cluster_head.seq
# add the modified cluster to the dictionary
self.cluster_collection[new_cluster_head.seqID] = current_cluster
# delete the previous entry in the cluster collection dictionary
del self.cluster_collection[previous_cluster_head_id]
# maybe there is a shortest way to do that...
return 0
def assignTaxonomy(self,blast_dictionnary,taxonomy_collection):
""" assignTaxonomy: given the blastn results (comparison of sequences with a reference database) stored in blast_dictionnary and the taxonomy collection extracted from the reference database, to attribute a taxonomy to the clusters of the cluster collection """
# go through all the clusters contained in the cluster collecion
for cluster_head_id in self.cluster_collection :
# set the best match reference id of the cluster to the value given by the blast_dictionnary
self.cluster_collection[cluster_head_id].best_match_ref_id = blast_dictionnary[cluster_head_id][0]
# set the taxonomy of the cluster to the value given by the blast_dictionnary
self.cluster_collection[cluster_head_id].taxonomic_affiliation = taxonomy_collection.getTaxonomy(blast_dictionnary[cluster_head_id][0])
# set the similarity wiht the reference of the cluster to the value given by the blast_dictionnary
self.cluster_collection[cluster_head_id].taxon_similarity = blast_dictionnary[cluster_head_id][1]
return 0
def buildClusterTables(self,list_sample_id,output_filename,min_cluster_size):
""" buildClusterTables : construct the table of the evolution of the clusters in the different samples, can be used to construct a heat map. The number of sequences per sample are normalised (so that every sample has the same amount of sequences) and then given in log(n+1) to lower the influence of the big clusters. A taxonomy table is also given as output, it contains the correspondance between the cluster ids and their taxonomic affiliation """
# open the output file (tab separated output file)
output_file = open(output_filename,'w')
# construct the output name of the taxonomy table from the output_filename
cluster_taxonomy_filename = output_filename.split('.')[0] + '_tax.' + output_filename.split('.')[1]
cluster_taxonomy_file = open(cluster_taxonomy_filename,'w')
# write the header
output_file.write('clusterID')
for sampleID in list_sample_id:
output_file.write('\t' + sampleID )
output_file.write('\n')
# count the total of sequences per sample and store the results in a temp dictionnary :
d_total_sample_size = {}
# set the initial sample size to 0
for sampleID in list_sample_id:
d_total_sample_size[sampleID] = 0
for cluster_head_id in self.cluster_collection :
try :
d_total_sample_size[sampleID] += self.cluster_collection[cluster_head_id].d_abundance_per_sample[sampleID]
# if the sample is not in the cluster.d_abundance_per_sample the abundance is zero
except :
d_total_sample_size[sampleID] += 0
# write header for output_file2
cluster_taxonomy_file.write('clusterID' + '\t' + 'refID' + '\t' + 'taxon' + '\n')
for cluster_head_id in self.cluster_collection :
# write the name of the line, that is the cluster_head_id
output_file.write(cluster_head_id)
# then for each sample write the normalized abundance
for sampleID in list_sample_id :
# normalize and take the log(n+1) to minimize the impact of very abundant clusters
try :
nb_seq_norm = self.cluster_collection[cluster_head_id].d_abundance_per_sample[sampleID]*100000/d_total_sample_size[sampleID]
nb_seq_norm = math.log10(nb_seq_norm+1)
output_file.write('\t' + str(nb_seq_norm))
# if there is no sequences from this cluster in this sample
except :
output_file.write('\t' + str(0))
output_file.write('\n')
#output_file1.write('\t' + str(d_taille_per_sample[sampleID]))
cluster_taxonomy_file.write(cluster_head_id)
# write the cluster head id , the id of the matching reference, the corresponding genus and the percentage of identity in the output_file 2
cluster_taxonomy_file.write('\t' + self.cluster_collection[cluster_head_id].best_match_ref_id +'\t' + self.cluster_collection[cluster_head_id].taxonomic_affiliation.genus + '\t' + self.cluster_collection[cluster_head_id].taxon_similarity +'\n')
def storeInDatabase(self,my_db_name):
""" storeInDatabase : stores the cluster collection in the local database """
temp_dictionnary = {}
metadata_reader = metadataReader(None,my_db_name)
for cluster_head_id in self.cluster_collection:
current_cluster = self.getCluster(cluster_head_id)
# build the dictionnary that will be used to transmit the data to the database
# the format is {field_name: value of the cluster}
temp_dictionnary['best_match_ref_id'] = current_cluster.best_match_ref_id
# build a string with the taxonomic informations
temp_dictionnary['taxonomic_affiliation'] = ''
temp_dictionnary['taxonomic_affiliation']+= 'k__' + current_cluster.taxonomic_affiliation.kingdom
temp_dictionnary['taxonomic_affiliation']+= 'p__' + current_cluster.taxonomic_affiliation.phylum
temp_dictionnary['taxonomic_affiliation']+= 'c__' + current_cluster.taxonomic_affiliation.classe
temp_dictionnary['taxonomic_affiliation']+= 'o__' + current_cluster.taxonomic_affiliation.order
temp_dictionnary['taxonomic_affiliation']+= 'f__' + current_cluster.taxonomic_affiliation.family
temp_dictionnary['taxonomic_affiliation']+= 'g__' + current_cluster.taxonomic_affiliation.genus
temp_dictionnary['taxonomic_affiliation']+= 's__' + current_cluster.taxonomic_affiliation.species
temp_dictionnary['taxon_similarity' ] = str(current_cluster.taxon_similarity)
temp_dictionnary['similarity_threshold'] = str(current_cluster.similarity_threshold)
temp_dictionnary['size'] = str(current_cluster.size)
temp_dictionnary['head_id'] = current_cluster.head_id
temp_dictionnary['seq'] = current_cluster.seq
# build a string containing the information of abundance per sample
temp_dictionnary['abundance_per_sample'] = ''
for sampleID in current_cluster.d_abundance_per_sample :
temp_dictionnary['abundance_per_sample'] += sampleID + ';' + str(current_cluster.d_abundance_per_sample[sampleID]) + ';'
temp_dictionnary['abundance_per_sample'] = temp_dictionnary['abundance_per_sample'][:-1]
metadata_reader.store('DNAsequences_clusters',temp_dictionnary)
return 0
##########################################################################
if __name__ == '__main__':
test = ClusterCollection()

Event Timeline