本文介绍了基于扩散自回归Transformer 的自监督时间序列预测方法。
近年来,随着机器学习技术的进步,深度神经网络已经成为解决时间序列预测问题的主流方法。这反映了学术界和工业界在利用先进技术处理序列数据复杂性方面的持续努力。
自监督学习概述
基本定义
自监督学习是一种创新的学习范式,其特点是模型能够从未标记数据中通过内部生成的监督信号进行学习,通常这种学习通过预文任务来实现。与传统的监督学习不同,自监督学习不需要外部标签,而是利用数据本身的内在结构来创建必要的学习信号。在时间序列领域的应用
在时间序列分析领域,自监督学习展现出独特的优势。它使得模型能够:
然而,这种学习方式仍面临着显著的挑战,这也是为什么需要像TimeDART这样的创新方法。通过集成扩散和自回归建模,TimeDART旨在解决这些根本性的挑战。现有方法的问题
需要有效理解和建模长期时间依赖;传统方法难以准确捕获序列中的全局模式需要精确捕获时间序列中的局部细节特征;现有方法在同时处理这两个任务时表现不佳这些挑战严重影响了模型学习全面和富有表现力的时间序列数据表示的能力。TimeDarT方法详解
TimeDART是一种专为时间序列预测设计的自监督学习方法。它的核心思想是通过从时间序列历史数据中学习模式来改进未来数据点的预测。研究者采用了一种创新的方法,将时间序列数据分解成更小的片段(patches),并将这些patches作为建模的基本单位。核心技术组件
- 使用了具有自注意力机制的Transformer编码器
TimeDART架构详解
实例归一化和Patch嵌入
Transformer编码器中的Patch间依赖关系
- 通过考虑时间序列数据中不同patches之间的关系
- Transformer编码器能够学习有意义的patch间表示
class TransformerEncoderBlock(nn.Module):
def __init__(
self, d_model: int, num_heads: int, feedforward_dim: int, dropout: float
):
super(TransformerEncoderBlock, self).__init__()
self.attention = nn.MultiheadAttention(
embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True
)
self.norm1 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, feedforward_dim),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(feedforward_dim, d_model),
)
self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=feedforward_dim, kernel_size=1)
self.activation = nn.GELU()
self.conv2 = nn.Conv1d(in_channels=feedforward_dim, out_channels=d_model, kernel_size=1)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
"""
:param x: [batch_size * num_features, seq_len, d_model]
:param mask: [1, 1, seq_len, seq_len]
:return: [batch_size * num_features, seq_len, d_model]
"""
attn_output, _ = self.attention(x, x, x, attn_mask=mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.ff(x)
output = self.norm2(x + self.dropout(ff_output))
return output
前向扩散过程
class Diffusion(nn.Module):
def __init__(
self,
time_steps: int,
device: torch.device,
scheduler: str = "cosine",
):
super(Diffusion, self).__init__()
self.device = device
self.time_steps = time_steps
if scheduler == "cosine":
self.betas = self._cosine_beta_schedule().to(self.device)
elif scheduler == "linear":
self.betas = self._linear_beta_schedule().to(self.device)
else:
raise ValueError(f"Invalid scheduler: {scheduler=}")
self.alpha = 1 - self.betas
self.gamma = torch.cumprod(self.alpha, dim=0).to(self.device)
def _cosine_beta_schedule(self, s=0.008):
steps = self.time_steps + 1
x = torch.linspace(0, self.time_steps, steps)
alphas_cumprod = (
torch.cos(((x / self.time_steps) + s) / (1 + s) * torch.pi * 0.5) ** 2
)
alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
return torch.clip(betas, 0, 0.999)
def _linear_beta_schedule(self, beta_start=1e-4, beta_end=0.02):
betas = torch.linspace(beta_start, beta_end, self.time_steps)
return betas
def sample_time_steps(self, shape):
return torch.randint(0, self.time_steps, shape, device=self.device)
def noise(self, x, t):
noise = torch.randn_like(x)
gamma_t = self.gamma[t].unsqueeze(-1) # [batch_size * num_features, seq_len, 1]
# x_t = sqrt(gamma_t) * x + sqrt(1 - gamma_t) * noise
noisy_x = torch.sqrt(gamma_t) * x + torch.sqrt(1 - gamma_t) * noise
return noisy_x, noise
def forward(self, x):
# x: [batch_size * num_features, seq_len, patch_len]
t = self.sample_time_steps(x.shape[:2]) # [batch_size * num_features, seq_len]
noisy_x, noise = self.noise(x, t)
return noisy_x, noise, t
基于交叉注意力的去噪解码器
- 使用掩码确保第j个噪声输入对应于Transformer编码器的第j个输出
class TransformerDecoderBlock(nn.Module):
def __init__(
self, d_model: int, num_heads: int, feedforward_dim: int, dropout: float
):
super(TransformerDecoderBlock, self).__init__()
self.self_attention = nn.MultiheadAttention(
embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True
)
self.norm1 = nn.LayerNorm(d_model)
self.encoder_attention = nn.MultiheadAttention(
embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True
)
self.norm2 = nn.LayerNorm(d_model)
self.ff = nn.Sequential(
nn.Linear(d_model, feedforward_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(feedforward_dim, d_model),
)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, query, key, value, tgt_mask, src_mask):
"""
:param query: [batch_size * num_features, seq_len, d_model]
:param key: [batch_size * num_features, seq_len, d_model]
:param value: [batch_size * num_features, seq_len, d_model]
:param mask: [1, 1, seq_len, seq_len]
:return: [batch_size * num_features, seq_len, d_model]
"""
# Self-attention
attn_output, _ = self.self_attention(query, query, query, attn_mask=tgt_mask)
query = self.norm1(query + self.dropout(attn_output))
# Encoder attention
attn_output, _ = self.encoder_attention(query, key, value, attn_mask=src_mask)
query = self.norm2(query + self.dropout(attn_output))
# Feed-forward network
ff_output = self.ff(query)
x = self.norm3(query + self.dropout(ff_output))
return x
用于全局依赖关系的自回归生成
class DenoisingPatchDecoder(nn.Module):
def __init__(
self,
d_model: int,
num_heads: int,
num_layers: int,
feedforward_dim: int,
dropout: float,
):
super(DenoisingPatchDecoder, self).__init__()
self.layers = nn.ModuleList(
[
TransformerDecoderBlock(d_model, num_heads, feedforward_dim, dropout)
for _ in range(num_layers)
]
)
self.norm = nn.LayerNorm(d_model)
def forward(self, query, key, value, is_tgt_mask=True, is_src_mask=True):
seq_len = query.size(1)
tgt_mask = (
generate_self_only_mask(seq_len).to(query.device) if is_tgt_mask else None
)
src_mask = (
generate_self_only_mask(seq_len).to(query.device) if is_src_mask else None
)
for layer in self.layers:
query = layer(query, key, value, tgt_mask, src_mask)
x = self.norm(query)
return x
class ForecastingHead(nn.Module):
def __init__(
self,
seq_len: int,
d_model: int,
pred_len: int,
dropout: float,
):
super(ForecastingHead, self).__init__()
self.pred_len = pred_len
self.flatten = nn.Flatten(start_dim=-2)
self.forecast_head = nn.Linear(seq_len * d_model, pred_len)
self.dropout = nn.Dropout(dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
:param x: [batch_size, num_features, seq_len, d_model]
:return: [batch_size, pred_len, num_features]
"""
x = self.flatten(x) # (batch_size, num_features, seq_len * d_model)
x = self.forecast_head(x) # (batch_size, num_features, pred_len)
x = self.dropout(x) # (batch_size, num_features, pred_len)
x = x.permute(0, 2, 1) # (batch_size, pred_len, num_features)
return x
优化和微调
实验评估
数据集介绍
- ETTh1、ETTh2、ETTm1、ETTm2四个子集
这些数据集涵盖了多个应用场景,包括电力系统、交通网络和天气预测等领域。实验结果分析
- "#1 Counts"表示该方法达到最佳结果的次数
- 展示了在五个数据集上预训练并在特定数据集上微调的结果
- 所有结果都是从4个不同预测窗口{96, 192, 336, 720}中平均得出
- 所有结果都是从4个不同预测窗口{96, 192, 336, 720}中平均得出
超参数敏感性分析
前向过程参数
去噪patch解码器层数
patch长度的影响
- 较大的patch长度可能更适合具有高冗余性的数据集
总结
TimeDART通过创新性地结合扩散模型和自回归建模,成功解决了时间序列预测中的关键挑战:
TimeDART的成功表明,结合不同的生成方法可以有效提升时间序列预测的性能,为该领域的进一步研究提供了新的思路。https://arxiv.org/abs/2410.05711
数据派THU作为数据科学类公众号,背靠清华大学大数据研究中心,分享前沿数据科学与大数据技术创新研究动态、持续传播数据科学知识,努力建设数据人才聚集平台、打造中国大数据最强集团军。
新浪微博:@数据派THU
微信视频号:数据派THU
今日头条:数据派THU