Page MenuHomec4science

models.py
No OneTemporary

File Metadata

Created
Fri, Jun 7, 05:08

models.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Neural Networks models module.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import numpy as np
from graph import laplacian_power_basis
from parameters import *
class SpectralConv(nn.Module):
def __init__(self, laplacian_matrix, num_nodes, filter_size_in, nodes_of_interest, filter_size_out, degree_of_polynomial):
super(SpectralConv, self).__init__()
self.laplacian_matrix = laplacian_matrix
self.num_nodes = num_nodes
self.filter_size_in = filter_size_in # K_
self.nodes_of_interest = nodes_of_interest
self.filter_size_out = filter_size_out # K
self.degree_of_polynomial = degree_of_polynomial # M
self.fc1 = nn.Linear(self.degree_of_polynomial+1, self.filter_size_out, bias=False)
self.fc2 = nn.Linear(self.filter_size_in, 1, bias=False)
def forward(self, input):
def select_columns(filter_operator, num_nodes):
""" Preservers the columns of the filter operator having an index in nodes of interest
and sets all other columns to zero. """
filter_operator = filter_operator.view(self.filter_size_out, self.num_nodes, self.num_nodes)
filter_operator_batch = []
for i in range(len(self.nodes_of_interest)):
filter_operator_sel_col = []
for j in range(len(self.nodes_of_interest[i])):
mask = Variable(torch.zeros(self.filter_size_out, self.num_nodes, self.num_nodes))
for k in range(len(self.nodes_of_interest[i][j])):
mask[:,:,self.nodes_of_interest[i][j][k]] = 1
filter_operator_sel_col.append(filter_operator * mask)
filter_operator_batch.append(torch.stack(filter_operator_sel_col))
return torch.stack(filter_operator_batch)
def apply_filter_operator(input, filter_operator):
""" Applies the filter operator for each input channel. """
# input = input.view(BATCH_SIZE, -1) implicit
filter_operator_sel_col = select_columns(filter_operator, self.num_nodes).view(BATCH_SIZE, -1)
output = []
for i in range(BATCH_SIZE):
output.append(torch.matmul(filter_operator_sel_col[i,:].view(self.num_nodes, -1).t(), input[i,:].view(self.num_nodes,-1)))
return torch.stack(output)
laplacian_tensor = laplacian_power_basis(self.laplacian_matrix, self.degree_of_polynomial)
filter_operator = self.fc1(Variable(laplacian_tensor)) # (NxN) x K
y = apply_filter_operator(input, filter_operator) # B x K_ x K x N
y = y.view(-1, self.filter_size_in) # (NxKxB) x K_
z = self.fc2(y)
z = z.view(BATCH_SIZE, -1) # B x (NxK)
return z
class DynamicPool(nn.Module):
def __init__(self, num_nodes, filter_size_dim, num_active_nodes):
super(DynamicPool, self).__init__()
self.num_nodes = num_nodes
self.filter_size_dim = filter_size_dim
self.num_active_nodes = num_active_nodes
def forward(self, input):
input = input.view(BATCH_SIZE, self.filter_size_dim, -1) # B x K x N
mask = []
nodes_of_interest = []
for i in range(BATCH_SIZE):
indices_omega = []
mask_by_batch = torch.zeros(self.num_nodes)
nodes_of_interest_by_batch = []
for j in range(self.filter_size_dim):
input_slice = input[i, j, ].clone()
input_slice = input_slice.view(-1)
nodes_of_interest_by_filter = []
for j in range(np.minimum(self.num_active_nodes, torch.nonzero(input_slice).size()[0])):
value, indice = torch.max(input_slice, 0)
indices_omega.extend(indice.data)
nodes_of_interest_by_filter.extend(indice.data)
input_slice[indice.data] = 0
nodes_of_interest_by_batch.append(nodes_of_interest_by_filter)
nodes_of_interest.append(nodes_of_interest_by_batch)
mask_by_batch[list(set(indices_omega))] = 1
mask.append(mask_by_batch.unsqueeze_(-1).expand(self.num_nodes, self.filter_size_dim).transpose(1,0))
mask = Variable(torch.stack(mask))
output = input * mask
output = output.view(BATCH_SIZE, -1)
return output, nodes_of_interest
class TIGraNet(nn.Module):
def __init__(self, laplacian_matrix, lr):
super(TIGraNet, self).__init__()
def init_nodes_of_interest(num_nodes, batch_size):
""" Initializes the nodes of interest by including all the nodes of the graph. """
nodes_of_interest = []
for i in range(batch_size):
nodes_of_interest.append([list(range(num_nodes))])
return nodes_of_interest
self.num_nodes = laplacian_matrix.size()[0]
self.nodes_of_interest = init_nodes_of_interest(num_nodes=self.num_nodes, batch_size=BATCH_SIZE)
# Main layers
self.spectral_conv1 = SpectralConv(laplacian_matrix=laplacian_matrix, num_nodes=self.num_nodes, filter_size_in=1, nodes_of_interest=self.nodes_of_interest, filter_size_out=3, degree_of_polynomial=3)
self.dynamic_pool1 = DynamicPool(num_nodes=self.num_nodes, filter_size_dim=3, num_active_nodes=300)
self.spectral_conv2 = SpectralConv(laplacian_matrix=laplacian_matrix, num_nodes=self.num_nodes, filter_size_in=3, nodes_of_interest=self.nodes_of_interest, filter_size_out=6, degree_of_polynomial=3)
self.dynamic_pool2 = DynamicPool(num_nodes=self.num_nodes, filter_size_dim=6, num_active_nodes=100)
# Temporary layer to run correctly
self.test = nn.Linear(784*6, 1)
self.optimizer = optim.Adam(self.parameters(), lr=lr)
self.loss_function = torch.nn.MSELoss()
def init_weights(self):
""" Initializes the weights of the model. """
for m in self.modules():
if isinstance(m, SpectralConv):
# m.weight_a.data. # ???
m.weight_b.data.uniform_(a=0, b=1)
def forward(self, input):
#print('input', input)
spectral_conv1 = self.spectral_conv1(input)
#print('spectral_conv1', spectral_conv1)
dynamic_pool1, self.nodes_of_interest = self.dynamic_pool1(spectral_conv1)
#print('dynamic_pool1', dynamic_pool1)
spectral_conv2 = self.spectral_conv2(dynamic_pool1)
#print('spectral_conv2', spectral_conv2)
dynamic_pool2, self.nodes_of_interest = self.dynamic_pool2(spectral_conv2)
#print('dynamic_pool2', dynamic_pool2)
output = self.test(dynamic_pool2)
#print('output', output)
return output
def step(self, input, target):
self.train()
self.zero_grad()
out = self.forward(input)
loss = self.loss_function(out, target)
loss.backward()
self.optimizer.step()
return loss.data[0]
def predict(self, input):
return self.forward(input)

Event Timeline