Try GCN QSPR with pytorch based graph library #RDKit #Pytorch #dgl

Recently many machine learning articles use pytorch for their implementation. And I found very attractive package for graph based deep learning, named ‘DGL;Deep Graph Library’. The package supports pytorch and mxnet for backend. The author provides not only package but also very nice documentation. I read the document and try GCN for QSPR with DGL.

https://www.dgl.ai/pages/about.html

The package can make graph object from networkx and can convert graph object to networkx object.
Following code example is based their example about batched graph convolution.
https://docs.dgl.ai/tutorials/basics/4_batch.html
At first, import packages which will use.

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
from collections import namedtuple
import dgl
from dgl import DGLGraph
import dgl.function as fn

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, TensorDataset
from torch.utils.data import DataLoader
import torch.optim as optim

import networkx as nx
import copy
import os
from rdkit import Chem
from rdkit.Chem import RDConfig
import numpy as np

Next define mol2graph function. Following example only node features used bond features did not used.

# Following code borrowed from dgl's junction tree example.
ELEM_LIST = ['C', 'N', 'O', 'S', 'F', 'Si', 'P', 'Cl', 'Br', 'Mg', 'Na', 'Ca', 'Fe', 'Al', 'I', 'B', 'K', 'Se', 'Zn', 'H', 'Cu', 'Mn', 'unknown']

ATOM_FDIM = len(ELEM_LIST) + 6 + 5 + 1
MAX_ATOMNUM =60
BOND_FDIM = 5 
MAX_NB = 10

PAPER = os.getenv('PAPER', False)

def onek_encoding_unk(x, allowable_set):
    if x not in allowable_set:
        x = allowable_set[-1]
    return [x == s for s in allowable_set]

# Note that during graph decoding they don't predict stereochemistry-related
# characteristics (i.e. Chiral Atoms, E-Z, Cis-Trans).  Instead, they decode
# the 2-D graph first, then enumerate all possible 3-D forms and find the
# one with highest score.
'''
def atom_features(atom):
    return (torch.Tensor(onek_encoding_unk(atom.GetSymbol(), ELEM_LIST)
            + onek_encoding_unk(atom.GetDegree(), [0,1,2,3,4,5])
            + onek_encoding_unk(atom.GetFormalCharge(), [-1,-2,1,2,0])
            + [atom.GetIsAromatic()]))
'''
def atom_features(atom):
    return (onek_encoding_unk(atom.GetSymbol(), ELEM_LIST)
            + onek_encoding_unk(atom.GetDegree(), [0,1,2,3,4,5])
            + onek_encoding_unk(atom.GetFormalCharge(), [-1,-2,1,2,0])
            + [atom.GetIsAromatic()])

def bond_features(bond):
    bt = bond.GetBondType()
    return (torch.Tensor([bt == Chem.rdchem.BondType.SINGLE, bt == Chem.rdchem.BondType.DOUBLE, bt == Chem.rdchem.BondType.TRIPLE, bt == Chem.rdchem.BondType.AROMATIC, bond.IsInRing()]))

def mol2dgl_single(mols):
    cand_graphs = []
    n_nodes = 0
    n_edges = 0
    bond_x = []

    for mol in mols:
        n_atoms = mol.GetNumAtoms()
        n_bonds = mol.GetNumBonds()
        g = DGLGraph()        
        nodeF = []
        for i, atom in enumerate(mol.GetAtoms()):
            assert i == atom.GetIdx()
            nodeF.append(atom_features(atom))
        g.add_nodes(n_atoms)

        bond_src = []
        bond_dst = []
        for i, bond in enumerate(mol.GetBonds()):
            a1 = bond.GetBeginAtom()
            a2 = bond.GetEndAtom()
            begin_idx = a1.GetIdx()
            end_idx = a2.GetIdx()
            features = bond_features(bond)

            bond_src.append(begin_idx)
            bond_dst.append(end_idx)
            bond_x.append(features)
            bond_src.append(end_idx)
            bond_dst.append(begin_idx)
            bond_x.append(features)
        g.add_edges(bond_src, bond_dst)
        g.ndata['h'] = torch.Tensor(nodeF)
        cand_graphs.append(g)
    return cand_graphs

Next defined the original collate function for data loader. And defined msg and reduce function.
msg function get message from neighbor node and reduce function aggregates the massages.

msg = fn.copy_src(src="h", out="m")
def collate(sample):
    graphs, labels = map(list,zip(*sample))
    batched_graph = dgl.batch(graphs)
    return batched_graph, torch.tensor(labels)
def reduce(nodes):
    # summazation by avarage is different part
    accum = torch.mean(nodes.mailbox['m'], 1)
    return {'h': accum}

Then defined the network. By using the dgl user can easily access node and features. For example graph.ndata[‘name’] method can access node features named ‘name’. NodeApplyModule is used for calculation of each node.
It is worth to know that by using torch nn.ModuleList, user can write network like ‘Keras’.

class NodeApplyModule(nn.Module):
    def __init__(self, in_feats, out_feats, activation):
        super(NodeApplyModule, self).__init__()
        self.linear = nn.Linear(in_feats, out_feats)
        self.activation = activation
    
    def forward(self, node):
        h = self.linear(node.data['h'])
        h = self.activation(h)
        return {'h': h}
    

class GCN(nn.Module):
    def __init__(self, in_feats, out_feats, activation):
        super(GCN, self).__init__()
        self.apply_mod = NodeApplyModule(in_feats, out_feats, activation)
    
    def forward(self, g, feature):
        g.ndata['h'] = feature
        g.update_all(msg, reduce)
        g.apply_nodes(func=self.apply_mod)
        h =  g.ndata.pop('h')
        #print(h.shape)
        return h
    
class Classifier(nn.Module):
    def __init__(self, in_dim, hidden_dim, n_classes):
        super(Classifier, self).__init__()
        self.layers = nn.ModuleList([GCN(in_dim, hidden_dim, F.relu),
                                    GCN(hidden_dim, hidden_dim, F.relu)])
        self.classify = nn.Linear(hidden_dim, n_classes)
    def forward(self, g):
        h = g.ndata['h']
        for conv in self.layers:
            h = conv(g, h)
        g.ndata['h'] = h
        hg = dgl.mean_nodes(g, 'h')
        return self.classify(hg)

Let’s load data. I used solubility dataset in RDKit

solcls = {'(A) low':0, '(B) medium':1, '(C) high':2}
train_mols = [m for m in Chem.SDMolSupplier(os.path.join(RDConfig.RDDocsDir,'Book/data/solubility.train.sdf'))]
train_y = [solcls[m.GetProp('SOL_classification')] for m in train_mols]
test_mols = [m for m in Chem.SDMolSupplier(os.path.join(RDConfig.RDDocsDir,'Book/data/solubility.test.sdf'))]
test_y = [solcls[m.GetProp('SOL_classification')] for m in test_mols]
train_graphs = mol2dgl_single(train_mols)
test_graphs = mol2dgl_single(test_mols)

dataset = list(zip(train_graphs, train_y))
data_loader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate)

Finally run train and check the performance.

model = Classifier(ATOM_FDIM, 256, len(solcls))
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
model.train()

epoch_losses = []
for epoch in range(200):
    epoch_loss = 0
    for i, (bg, label) in enumerate(data_loader):
        bg.set_e_initializer(dgl.init.zero_initializer)
        bg.set_n_initializer(dgl.init.zero_initializer)        
        pred = model(bg)
        loss = loss_func(pred, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss += loss.detach().item()
    epoch_loss /= (i + 1)
    if (epoch+1) % 20 == 0:
        print('Epoch {}, loss {:.4f}'.format(epoch+1, epoch_loss))
    epoch_losses.append(epoch_loss)

>Epoch 20, loss 0.6104
>Epoch 40, loss 0.5616
>Epoch 60, loss 0.5348
>Epoch 80, loss 0.5095
>Epoch 100, loss 0.4915
>Epoch 120, loss 0.5163
>Epoch 140, loss 0.5348
>Epoch 160, loss 0.4385
>Epoch 180, loss 0.4421
>Epoch 200, loss 0.4318
plt.plot(epoch_losses, c='b')
model.eval()
test_bg = dgl.batch(test_graphs)
test_y_tensor = torch.tensor(test_y).float().view(-1,1)
test_bg.set_e_initializer(dgl.init.zero_initializer)
test_bg.set_n_initializer(dgl.init.zero_initializer)
logit = model(test_bg)
probs = torch.softmax(logit, 1).detach().numpy()
pred_y = np.argmax(probs,1)

from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
accuracy_score(test_y, pred_y)
>0.7587548638132295
print(classification_report(test_y, pred_y))
              precision    recall  f1-score   support

           0       0.70      0.86      0.78       102
           1       0.79      0.64      0.71       115
           2       0.87      0.82      0.85        40

   micro avg       0.76      0.76      0.76       257
   macro avg       0.79      0.78      0.78       257
weighted avg       0.77      0.76      0.76       257

Hmm not so bad. OK I tried random forest next. I would like to use RDKit new descriptor, ‘dragon-type descriptor’. So I used it ;)

https://github.com/rdkit/UGM_2018/blob/master/Notebooks/Landrum_Whats_New.ipynb

from rdkit.Chem import AllChem
from rdkit.Chem.Descriptors import rdMolDescriptors
from sklearn.preprocessing import normalize
# generate 3D conf
train_mols2 = copy.deepcopy(train_mols)
test_mols2 = copy.deepcopy(test_mols)

ps = AllChem.ETKDGv2()
for m in train_mols2:
    m = Chem.AddHs(m)
    AllChem.EmbedMolecule(m,ps)
for m in test_mols2:
    m = Chem.AddHs(m)
    AllChem.EmbedMolecule(m,ps)
def calc_dragon_type_desc(mol):
    return rdMolDescriptors.CalcAUTOCORR3D(mol) + rdMolDescriptors.CalcMORSE(mol) + \
        rdMolDescriptors.CalcRDF(mol) + rdMolDescriptors.CalcWHIM(mol)
train_X = normalize([calc_dragon_type_desc(m) for m in train_mols2])
test_X = normalize([calc_dragon_type_desc(m) for m in test_mols2])

For convenience I use only one conformer the above code.

from sklearn.ensemble import RandomForestClassifier
rfc = RandomForestClassifier(n_estimators=100)
rfc.fit(train_X, train_y)
rf_pred_y = rfc.predict(test_X)
accuracy_score(test_y, rf_pred_y)
>0.7587548638132295
print(classification_report(test_y, rf_pred_y))
              precision    recall  f1-score   support

           0       0.77      0.87      0.82       102
           1       0.79      0.66      0.72       115
           2       0.67      0.75      0.71        40

   micro avg       0.76      0.76      0.76       257
   macro avg       0.74      0.76      0.75       257
weighted avg       0.76      0.76      0.76       257

Random Forest showed same performance with GCN in this test.
DGL is very useful package for graph based deeplearning I think. Their original repo provides many example codes! Reader who is interested in pls check the repo.

I uploaded today’s code my git hub and it can check following URL.

https://nbviewer.jupyter.org/github/iwatobipen/playground/blob/master/GCN_chemo.ipynb

Advertisements

Build QSAR model with pytorch and rdkit #RDKit

There are many frameworks in python deeplearning. For example chainer, Keras, Theano, Tensorflow and pytorch.
I have tried Keras, Chainer and Tensorflow for QSAR modeling. And I tried to build QSAR model by using pytorch and RDKit.
You know, pytorch has Dynamic Neural Networks “Define-by-Run” like chainer.
I used solubility data that is provided from rdkit and I used the dataset before.

Let’s start coding.
At first I imported package that is needed for QSAR and defined some utility functions.

import pprint
import argparse
import torch
import torch.optim as optim
from torch import nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem import DataStructs
import numpy as np
#from sklearn import preprocessing


def base_parser():
    parser = argparse.ArgumentParser("This is simple test of pytorch")
    parser.add_argument("trainset", help="sdf for train")
    parser.add_argument("testset", help="sdf for test")
    parser.add_argument("--epochs", default=150)
    return parser

parser = base_parser()
args = parser.parse_args()
traindata = [mol for mol in Chem.SDMolSupplier(args.trainset) if mol is not None]
testdata = [mol for mol in Chem.SDMolSupplier(args.testset) if mol is not None]

def molsfeaturizer(mols):
    fps = []
    for mol in mols:
        arr = np.zeros((0,))
        fp = AllChem.GetMorganFingerprintAsBitVect(mol, 2)
        DataStructs.ConvertToNumpyArray(fp, arr)
        fps.append(arr)
    fps = np.array(fps, dtype = np.float)
    return fps

classes = {"(A) low":0, "(B) medium":1, "(C) high":2}
#classes = {"(A) low":0, "(B) medium":1, "(C) high":1}

trainx = molsfeaturizer(traindata)
testx = molsfeaturizer(testdata)
# for pytorch, y must be long type!!
trainy = np.array([classes[mol.GetProp("SOL_classification")] for mol in traindata], dtype=np.int64)
testy = np.array([classes[mol.GetProp("SOL_classification")] for mol in testdata], dtype=np.int64)

torch.from_numpy function can convert numpy array to torch tensor. It is very convenient for us.
And then I defined neural network. I feel this method is very unique because I mostly use Keras for deep learning.
To build the model in pytorch, I need define the each layer and whole structure.

X_train = torch.from_numpy(trainx)
X_test = torch.from_numpy(testx)
Y_train = torch.from_numpy(trainy)
Y_test = torch.from_numpy(testy)
print(X_train.size(),Y_train.size())
print(X_test.size(), Y_train.size())

class QSAR_mlp(nn.Module):
    def __init__(self):
        super(QSAR_mlp, self).__init__()
        self.fc1 = nn.Linear(2048, 524)
        self.fc2 = nn.Linear(524, 10)
        self.fc3 = nn.Linear(10, 10)
        self.fc4 = nn.Linear(10,3)
    def forward(self, x):
        x = x.view(-1, 2048)
        h1 = F.relu(self.fc1(x))
        h2 = F.relu(self.fc2(h1))
        h3 = F.relu(self.fc3(h2))
        output = F.sigmoid(self.fc4(h3))
        return output

After defining the model I tried to lean and prediction.
Following code is training and prediction parts.

model = QSAR_mlp()
print(model)

losses = []
optimizer = optim.Adam( model.parameters(), lr=0.005)
for epoch in range(args.epochs):
    data, target = Variable(X_train).float(), Variable(Y_train).long()
    optimizer.zero_grad()
    y_pred = model(data)
    loss = F.cross_entropy(y_pred, target)
    print("Loss: {}".format(loss.data[0]))
    loss.backward()
    optimizer.step()

pred_y = model(Variable(X_test).float())
predicted = torch.max(pred_y, 1)[1]

for i in range(len(predicted)):
    print("pred:{}, target:{}".format(predicted.data[i], Y_test[i]))

print( "Accuracy: {}".format(sum(p==t for p,t in zip(predicted.data, Y_test))/len(Y_test)))

Check the code.

iwatobipen$ python qsar_pytorch.py solubility.train.sdf solubility.test.sdf 
torch.Size([1025, 2048]) torch.Size([1025])
torch.Size([257, 2048]) torch.Size([1025])
QSAR_mlp(
  (fc1): Linear(in_features=2048, out_features=524)
  (fc2): Linear(in_features=524, out_features=10)
  (fc3): Linear(in_features=10, out_features=10)
  (fc4): Linear(in_features=10, out_features=3)
)
Loss: 1.1143544912338257
-snip-
Loss: 0.6231405735015869
pred:1, target:0
pred:1, target:0
-snip-
pred:0, target:0
Accuracy: 0.642023346303502

Hmm, accuracy is not so high. I believe there is still room for improvement. I am newbie of pytorch. I will try to practice pytorch next year.

This is my last code of this year. I would like to post my blog more in next year.
If readers who find mistake in my code, please let me know.

Have a happy new year !!!!
;-)