PyG-Data和Dataset篇
⽬录
系统需求
要求⾄少安装 PyTorch 1.2.0 版本。
$ python -c "import torch; print(torch.__version__)"
>>> 1.6.0
PyTorch Geometric 基础知识
热交换设备这⼀部分我们介绍⼀下 PyG 的基础知识,主要包括 torch_geometric.data 和 部分。另外,还会介绍怎么设计⾃⼰的 Message Passing Layer。
Data 类
torch_geometric.data 包⾥有⼀个 Data 类,通过 Data 类我们可以很⽅便的创建图结构。
2. 边的连接关系或者边的 features
我们以下⾯的图结构为例,看看怎么⽤ Data 类创建图结构:
图的节点x可以根据其值进⾏向量表⽰,⽽节点与节点间使⽤邻接矩阵(这⾥⽤的边表edge_index)来表⽰。
邻接矩阵dege_index主要由源节点(第⼀列)和⽬标节点(第⼆列)组成。源节点和⽬标节点顺序对应。⽐如 ,在图中,节点0的⽬标节点(指向的点)有节点1和节点3,因此节点0可以⽤[[0,0],[1,3]]来表⽰。所以,邻接矩阵的关键是,源节点列和⽬标节点列的对应关系表⽰。
在上图中,⼀共有四个节点 v1,v2,v3,v4,其中每个节点都有⼀个⼆维的特征向量和⼀个标签 y。这个特征向量和标签可以
⽤ FloatTensor 来表⽰:
x = sor([[2,1], [5,6], [3,7], [12,0]], dtype=torch.float)
y = sor([0, 1, 0, 1], dtype=torch.float)
图的连接关系(边)可以⽤ COO 格式表⽰。COO 格式的维度是 [2, num_edges],其中第⼀个列表是所有边上起始节点的 index,第⼆个列表是对应边上⽬标节点的 index:
edge_index = sor([[0, 1, 2, 0, 3],
[1, 0, 1, 3, 2]], dtype=torch.long)
注意上⾯的数据⾥定义边的顺序是⽆关紧要的,这个数据仅仅⽤来计算邻接矩阵⽤的,⽐如上⾯的定义和下⾯的定义是等价的: 三维激光扫描系统edge_index = sor([[0, 2, 1, 0, 3],
[3, 1, 0, 1, 2]], dtype=torch.long)
综上所述,我们可以这样定义上⾯的图结构:
import torch
from torch_geometric.data import Data
x = sor([[2,1], [5,6], [3,7], [12,0]], dtype=torch.float)
y = sor([0, 1, 0, 1], dtype=torch.float)
edge_index = sor([[0, 2, 1, 0, 3],
[3, 1, 0, 1, 2]], dtype=torch.long)
data = Data(x=x, y=y, edge_index=edge_index)
>>> Data(edge_index=[2, 5], x=[4, 2], y=[4])
湖水净化
Dataset
PyG ⾥有两种数据集类型:InMemoryDataset 和 Dataset,第⼀种适⽤于可以全部放进内存中的⼩数据集,第⼆种则适⽤于不能⼀次性放进内存中的⼤数据集。我们以 InMemoryDataset 为例。
InMemoryDataset 中有下列四个函数需要我们实现: raw_file_names()
返回⼀个包含所有未处理过的数据⽂件的⽂件名的列表。
起始也可以返回⼀个空列表,然后在后⾯要说的 process() 函数⾥再定义。
processed_file_names()
返回⼀个包含所有处理过的数据⽂件的⽂件名的列表。
download()
如果在数据加载前需要先下载,则在这⾥定义下载过程,下载到 self.raw_dir 中定义的⽂件夹位置。
如果不需要下载,返回 pass 即可。
process()
这是最重要的⼀个函数,我们需要在这个函数⾥把数据处理成⼀个 Data 对象。下⾯是官⽅的⼀个⽰例代码:
import torch
from torch_geometric.data import InMemoryDataset
class MyOwnDataset(InMemoryDataset):
def __init__(self, root, transform=None, pre_transform=None):
super(MyOwnDataset, self).__init__(root, transform, pre_transform)
self.data, self.slices = torch.load(self.processed_paths[0])
@property
def raw_file_names(self):
return ['some_file_1', 'some_file_2', ...]
@property
def processed_file_names(self):
return ['data.pt']
def download(self):
# Download to `self.raw_dir`.
def process(self):
# Read data into huge `Data` list.
data_list = [...]
还原炉if self.pre_filter is not None:
data_list [data for data in data_list if self.pre_filter(data)]
if self.pre_transform is not None:
data_list = [self.pre_transform(data) for data in data_list]
data, slices = llate(data_list)
torch.save((data, slices), self.processed_paths[0])
本⽂接下来会介绍如何⽤ RecSys Challenge 2015 的数据创建⼀个⾃定义数据集。
DataLoader
这个类可以帮助我们将数据按 batch 传给 model,定义的⽅法如下,需要制定 batch_size 和 dataset:loader = DataLoader(dataset, batch_size=512, shuffle=True)
每个 loader 的循环都返回⼀个 Batch 对象:
for batch in loader:
batch
>>> Batch(x=[1024, 21], edge_index=[2, 1568], y=[512], batch=[1024])
Batch 相⽐ Data 对象多了⼀个 batch 参数,告诉我们这个 batch ⾥都包含哪些 nodes,便于计算。MessagePassing
图神经⽹络定义:节点i的值是他相邻的节点加权和它上⼀轮的值之和(可以理解为update阶段)。
Message Passing 的公⽰如下:
其中,x 表⽰节点的 embedding,e 表⽰边的特征,ϕ 表⽰ message 函数,□ 表⽰聚合 aggregation 函数,γ 表⽰ update 函数。上标表⽰层的 index,⽐如说,当 k = 1 时,x 则表⽰所有输⼊⽹络的图结构的数据。
下⾯是每个函数的介绍:
propagate(edge_index, size=None, **kwargs)
这个函数最终会调⽤ message 和 update 函数。
message(**kwargs)
这个函数定义了对于每个节点对 (xi,xj),怎样⽣成信息(message)。
update(aggr_out, **kwargs)
这个函数利⽤聚合好的信息(message)更新每个节点的 embedding。
⽰例:SageConv
聚合函数(aggregation)我们⽤最⼤池化(max pooling),这样上述公⽰中的 AGGREGATE 可以写为:
上述公式中,对于每个邻居节点,都和⼀个 weighted matrix 相乘,并且加上⼀个 bias,传给⼀个激活函数。相关代码如下:
class SAGEConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(SAGEConv, self).__init__(aggr='max')
self.lin = Linear(in_channels, out_channels)
EMSKD
self.act = ReLU()
def message(self, x_j):
# x_j has shape [E, in_channels]
x_j = self.lin(x_j)
x_j = self.act(x_j)
return x_j
对于 update ⽅法,我们需要聚合更新每个节点的 embedding,然后加上权重矩阵和偏置:
class SAGEConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(SAGEConv, self).__init__(aggr='max')
self.update_lin = Linear(in_channels + out_channels, in_channels, bias=False) self.update_act = ReLU()
def update(self, aggr_out, x):
# aggr_out has shape [N, out_channels]
new_embedding = torch.cat([aggr_out, x], dim=1)
new_embedding = self.update_lin(new_embedding)
new_embedding = torch.update_act(new_embedding)
return new_embedding
综上所述,SageConv 层的定于⽅法如下:
import torch
import Sequential as Seq, Linear, ReLU
from import MessagePassing
from torch_geometric.utils import remove_self_loops, add_self_loops
class SAGEConv(MessagePassing):
def __init__(self, in_channels, out_channels):
super(SAGEConv, self).__init__(aggr='max') # "Max" aggregation.
self.lin = Linear(in_channels, out_channels)
self.act = ReLU()
self.update_lin = Linear(in_channels + out_channels, in_channels, bias=False) self.update_act = ReLU()
def forward(self, x, edge_index):
# x has shape [N, in_channels]
# edge_index has shape [2, E]
edge_index, _ = remove_self_loops(edge_index)
edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
return self.propagate(edge_index, size=(x.size(0), x.size(0)), x=x)
def message(self, x_j):
# x_j has shape [E, in_channels]
x_j = self.lin(x_j)
成上上网x_j = self.act(x_j)
return x_j
def update(self, aggr_out, x):
# aggr_out has shape [N, out_channels]
new_embedding = torch.cat([aggr_out, x], dim=1)
new_embedding = self.update_lin(new_embedding)
new_embedding = self.update_act(new_embedding)
return new_embedding