我们现在必须为可重复性设置很多种子,所以让我们将它们全部包装在一个函数中。
- import numpy as np
- import pandas as pd
- import random
- import torch
- import torch.nn as nn
SEED = 1234
- def set_seeds(seed=1234):
- """Set seeds for reproducibility."""
- np.random.seed(seed)
- random.seed(seed)
- torch.manual_seed(seed)
- torch.cuda.manual_seed(seed)
- torch.cuda.manual_seed_all(seed) # multi-GPU
- # Set seeds for reproducibility
- set_seeds(seed=SEED)
- # Set device
- cuda = True
- device = torch.device("cuda" if (
- torch.cuda.is_available() and cuda) else "cpu")
- torch.set_default_tensor_type("torch.FloatTensor")
- if device.type == "cuda":
- torch.set_default_tensor_type("torch.cuda.FloatTensor")
- print (device)
我们将使用之前课程中相同的螺旋数据集来演示我们的实用程序。
- import matplotlib.pyplot as plt
- import pandas as pd
- # Load data
- url = "https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/spiral.csv"
- df = pd.read_csv(url, header=0) # load
- df = df.sample(frac=1).reset_index(drop=True) # shuffle
- df.head()
| X1 | X2 | color | |
|---|---|---|---|
| 0 | 0.106737 | 0.114197 | c1 |
| 1 | 0.311513 | -0.664028 | c1 |
| 2 | 0.019870 | -0.703126 | c1 |
| 3 | -0.054017 | 0.508159 | c3 |
| 4 | -0.127751 | -0.011382 | c3 |
- # Data shapes
- X = df[["X1", "X2"]].values
- y = df["color"].values
- print ("X: ", np.shape(X))
- print ("y: ", np.shape(y))
- X: (1500, 2)
- y: (1500,)
- # Visualize data
- plt.title("Generated non-linear data")
- colors = {"c1": "red", "c2": "yellow", "c3": "blue"}
- plt.scatter(X[:, 0], X[:, 1], c=[colors[_y] for _y in y], edgecolors="k", s=25)
- plt.show()

- import collections
- from sklearn.model_selection import train_test_split
- TRAIN_SIZE = 0.7
- VAL_SIZE = 0.15
- TEST_SIZE = 0.15
- def train_val_test_split(X, y, train_size):
- """Split dataset into data splits."""
- X_train, X_, y_train, y_ = train_test_split(X, y, train_size=TRAIN_SIZE, stratify=y)
- X_val, X_test, y_val, y_test = train_test_split(X_, y_, train_size=0.5, stratify=y_)
- return X_train, X_val, X_test, y_train, y_val, y_test
- # Create data splits
- X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split(
- X=X, y=y, train_size=TRAIN_SIZE)
- print (f"X_train: {X_train.shape}, y_train: {y_train.shape}")
- print (f"X_val: {X_val.shape}, y_val: {y_val.shape}")
- print (f"X_test: {X_test.shape}, y_test: {y_test.shape}")
- print (f"Sample point: {X_train[0]} → {y_train[0]}")
- X_train: (1050, 2), y_train: (1050,)
- X_val: (225, 2), y_val: (225,)
- X_test: (225, 2), y_test: (225,)
- 采样点:[-0.63919105 -0.69724176] → c1
接下来,我们将定义 aLabelEncoder将我们的文本标签编码为唯一索引。我们不再使用 scikit-learn 的 LabelEncoder,因为我们希望能够以我们想要的方式保存和加载我们的实例。
import itertools
- class LabelEncoder(object):
- """Label encoder for tag labels."""
- def __init__(self, class_to_index={}):
- self.class_to_index = class_to_index or {} # mutable defaults ;)
- self.index_to_class = {v: k for k, v in self.class_to_index.items()}
- self.classes = list(self.class_to_index.keys())
-
- def __len__(self):
- return len(self.class_to_index)
-
- def __str__(self):
- return f"
" -
- def fit(self, y):
- classes = np.unique(y)
- for i, class_ in enumerate(classes):
- self.class_to_index[class_] = i
- self.index_to_class = {v: k for k, v in self.class_to_index.items()}
- self.classes = list(self.class_to_index.keys())
- return self
-
- def encode(self, y):
- encoded = np.zeros((len(y)), dtype=int)
- for i, item in enumerate(y):
- encoded[i] = self.class_to_index[item]
- return encoded
-
- def decode(self, y):
- classes = []
- for i, item in enumerate(y):
- classes.append(self.index_to_class[item])
- return classes
-
- def save(self, fp):
- with open(fp, "w") as fp:
- contents = {'class_to_index': self.class_to_index}
- json.dump(contents, fp, indent=4, sort_keys=False)
-
- @classmethod
- def load(cls, fp):
- with open(fp, "r") as fp:
- kwargs = json.load(fp=fp)
- return cls(**kwargs)
- # Encode
- label_encoder = LabelEncoder()
- label_encoder.fit(y_train)
- label_encoder.class_to_index
{“c1”:0,“c2”:1,“c3”:2}
- # Convert labels to tokens
- print (f"y_train[0]: {y_train[0]}")
- y_train = label_encoder.encode(y_train)
- y_val = label_encoder.encode(y_val)
- y_test = label_encoder.encode(y_test)
- print (f"y_train[0]: {y_train[0]}")
- y_train[0]:c1
- y_train[0]:0
- # Class weights
- counts = np.bincount(y_train)
- class_weights = {i: 1.0/count for i, count in enumerate(counts)}
- print (f"counts: {counts}\nweights: {class_weights}")
- 计数:[350 350 350]
- 权重:{0: 0.002857142857142857, 1: 0.002857142857142857, 2: 0.002857142857142857}
我们需要标准化我们的数据(零均值和单位方差),以便特定特征的大小不会影响模型如何学习其权重。我们只会对输入 X 进行标准化,因为我们的输出 y 是类值。我们将编写自己的StandardScaler类,以便稍后在推理过程中轻松保存和加载它。
- class StandardScaler(object):
- def __init__(self, mean=None, std=None):
- self.mean = np.array(mean)
- self.std = np.array(std)
-
- def fit(self, X):
- self.mean = np.mean(X_train, axis=0)
- self.std = np.std(X_train, axis=0)
-
- def scale(self, X):
- return (X - self.mean) / self.std
-
- def unscale(self, X):
- return (X * self.std) + self.mean
-
- def save(self, fp):
- with open(fp, "w") as fp:
- contents = {"mean": self.mean.tolist(), "std": self.std.tolist()}
- json.dump(contents, fp, indent=4, sort_keys=False)
-
- @classmethod
- def load(cls, fp):
- with open(fp, "r") as fp:
- kwargs = json.load(fp=fp)
- return cls(**kwargs)
- # Standardize the data (mean=0, std=1) using training data
- X_scaler = StandardScaler()
- X_scaler.fit(X_train)
-
- # Apply scaler on training and test data (don't standardize outputs for classification)
- X_train = X_scaler.scale(X_train)
- X_val = X_scaler.scale(X_val)
- X_test = X_scaler.scale(X_test)
- # Check (means should be ~0 and std should be ~1)
- print (f"X_test[0]: mean: {np.mean(X_test[:, 0], axis=0):.1f}, std: {np.std(X_test[:, 0], axis=0):.1f}")
- print (f"X_test[1]: mean: {np.mean(X_test[:, 1], axis=0):.1f}, std: {np.std(X_test[:, 1], axis=0):.1f}")
- X_test[0]:平均值:0.1,标准:0.9
- X_test[1]:平均值:0.0,标准:1.0
我们将把我们的数据放入 aDataset并使用 aDataLoader来有效地创建用于训练和评估的批次。
- import torch
-
- # Seed seed for reproducibility
- torch.manual_seed(SEED)
- class Dataset(torch.utils.data.Dataset):
- def __init__(self, X, y):
- self.X = X
- self.y = y
-
- def __len__(self):
- return len(self.y)
-
- def __str__(self):
- return f"
" -
- def __getitem__(self, index):
- X = self.X[index]
- y = self.y[index]
- return [X, y]
-
- def collate_fn(self, batch):
- """Processing on a batch."""
- # Get inputs
- batch = np.array(batch)
- X = np.stack(batch[:, 0], axis=0)
- y = batch[:, 1]
-
- # Cast
- X = torch.FloatTensor(X.astype(np.float32))
- y = torch.LongTensor(y.astype(np.int32))
-
- return X, y
-
- def create_dataloader(self, batch_size, shuffle=False, drop_last=False):
- return torch.utils.data.DataLoader(
- dataset=self, batch_size=batch_size, collate_fn=self.collate_fn,
- shuffle=shuffle, drop_last=drop_last, pin_memory=True)
我们真的不需要collate_fn这里,但我们想让它透明,因为当我们想要对我们的批处理进行特定处理时(例如填充),我们将需要它。
- # Create datasets
- train_dataset = Dataset(X=X_train, y=y_train)
- val_dataset = Dataset(X=X_val, y=y_val)
- test_dataset = Dataset(X=X_test, y=y_test)
- print ("Datasets:\n"
- f" Train dataset:{train_dataset.__str__()}\n"
- f" Val dataset: {val_dataset.__str__()}\n"
- f" Test dataset: {test_dataset.__str__()}\n"
- "Sample point:\n"
- f" X: {train_dataset[0][0]}\n"
- f" y: {train_dataset[0][1]}")
到目前为止,我们使用批量梯度下降来更新我们的权重。这意味着我们使用整个训练数据集计算了梯度。我们也可以使用随机梯度下降 (SGD) 更新我们的权重,我们一次传入一个训练示例。当前的标准是小批量梯度下降,它在批量和 SGD 之间取得平衡,我们使用 n ( BATCH_SIZE) 个样本的小批量更新权重。这是DataLoader对象派上用场的地方。
- # Create dataloaders
- batch_size = 64
- train_dataloader = train_dataset.create_dataloader(batch_size=batch_size)
- val_dataloader = val_dataset.create_dataloader(batch_size=batch_size)
- test_dataloader = test_dataset.create_dataloader(batch_size=batch_size)
- batch_X, batch_y = next(iter(train_dataloader))
- print ("Sample batch:\n"
- f" X: {list(batch_X.size())}\n"
- f" y: {list(batch_y.size())}\n"
- "Sample point:\n"
- f" X: {batch_X[0]}\n"
- f" y: {batch_y[0]}")
- 样品批次:
- X: [64, 2]
- 和: [64]
- 采样点:
- X:张量([-1.4736,-1.6742])
- 和:0
到目前为止,我们一直在 CPU 上运行我们的操作,但是当我们有大型数据集和更大的模型要训练时,我们可以通过在 GPU 上并行化张量操作而受益。在此笔记本中,您可以通过转到下拉菜单中的Runtime> Change runtime type> 选择来使用 GPU。我们可以使用以下代码行使用什么设备:GPUHardware accelerator
- # Set CUDA seeds
- torch.cuda.manual_seed(SEED)
- torch.cuda.manual_seed_all(SEED) # multi-GPU
-
- # Device configuration
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- print (device)
让我们初始化我们将用来展示训练实用程序功能的模型。
- import math
- from torch import nn
- import torch.nn.functional as F
- INPUT_DIM = X_train.shape[1] # 2D
- HIDDEN_DIM = 100
- DROPOUT_P = 0.1
- NUM_CLASSES = len(label_encoder.classes)
- NUM_EPOCHS = 10
- class MLP(nn.Module):
- def __init__(self, input_dim, hidden_dim, dropout_p, num_classes):
- super(MLP, self).__init__()
- self.fc1 = nn.Linear(input_dim, hidden_dim)
- self.dropout = nn.Dropout(dropout_p)
- self.fc2 = nn.Linear(hidden_dim, num_classes)
-
- def forward(self, inputs):
- x_in, = inputs
- z = F.relu(self.fc1(x_in))
- z = self.dropout(z)
- z = self.fc2(z)
- return z
- # Initialize model
- model = MLP(
- input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM,
- dropout_p=DROPOUT_P, num_classes=NUM_CLASSES)
- model = model.to(device) # set device
- print (model.named_parameters)
到目前为止,我们一直在编写仅使用训练数据拆分进行训练的训练循环,然后我们对测试集进行评估。但实际上,我们会遵循这个过程:
我们将创建一个Trainer类来组织所有这些过程。
该类中的第一个函数train_step将使用来自训练数据拆分的一个时期的批次训练模型。
- def train_step(self, dataloader):
- """Train step."""
- # Set model to train mode
- self.model.train()
- loss = 0.0
-
- # Iterate over train batches
- for i, batch in enumerate(dataloader):
-
- # Step
- batch = [item.to(self.device) for item in batch] # Set device
- inputs, targets = batch[:-1], batch[-1]
- self.optimizer.zero_grad() # Reset gradients
- z = self.model(inputs) # Forward pass
- J = self.loss_fn(z, targets) # Define loss
- J.backward() # Backward pass
- self.optimizer.step() # Update weights
-
- # Cumulative Metrics
- loss += (J.detach().item() - loss) / (i + 1)
-
- return loss
接下来,我们将定义eval_step将用于处理验证和测试数据拆分的哪个。这是因为它们都不需要梯度更新并显示相同的指标。
- def eval_step(self, dataloader):
- """Validation or test step."""
- # Set model to eval mode
- self.model.eval()
- loss = 0.0
- y_trues, y_probs = [], []
-
- # Iterate over val batches
- with torch.inference_mode():
- for i, batch in enumerate(dataloader):
-
- # Step
- batch = [item.to(self.device) for item in batch] # Set device
- inputs, y_true = batch[:-1], batch[-1]
- z = self.model(inputs) # Forward pass
- J = self.loss_fn(z, y_true).item()
-
- # Cumulative Metrics
- loss += (J - loss) / (i + 1)
-
- # Store outputs
- y_prob = F.softmax(z).cpu().numpy()
- y_probs.extend(y_prob)
- y_trues.extend(y_true.cpu().numpy())
-
- return loss, np.vstack(y_trues), np.vstack(y_probs)
最后一个函数是predict_step用于推理的函数。eval_step除了我们不计算任何指标外,它与 非常相似。我们传递可以用来生成性能分数的预测。
- def predict_step(self, dataloader):
- """Prediction step."""
- # Set model to eval mode
- self.model.eval()
- y_probs = []
-
- # Iterate over val batches
- with torch.inference_mode():
- for i, batch in enumerate(dataloader):
-
- # Forward pass w/ inputs
- inputs, targets = batch[:-1], batch[-1]
- z = self.model(inputs)
-
- # Store outputs
- y_prob = F.softmax(z).cpu().numpy()
- y_probs.extend(y_prob)
-
- return np.vstack(y_probs)
随着我们的模型开始优化并表现更好,损失将减少,我们需要进行较小的调整。如果我们继续使用固定的学习率,我们就会来回过冲。因此,我们将在优化器中添加一个学习率调度程序,以在训练期间调整我们的学习率。有许多调度程序可供选择,但一种流行的调度程序是ReduceLROnPlateau在度量(例如验证损失)停止改进时降低学习率。factor=0.1在下面的示例中,当我们的兴趣指标( ) 连续三个 ( ) 时期self.scheduler.step(val_loss)停止下降 ( ) 时,我们将学习率降低 0.1 ( )。mode="min"patience=3
-
- # Initialize the LR scheduler
- scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
- optimizer, mode="min", factor=0.1, patience=3)
- ...
- train_loop():
- ...
- # Steps
- train_loss = trainer.train_step(dataloader=train_dataloader)
- val_loss, _, _ = trainer.eval_step(dataloader=val_dataloader)
- self.scheduler.step(val_loss)
- ...
我们永远不应该为任意数量的 epoch 训练我们的模型,而是应该有明确的停止标准(即使你被计算资源引导)。常见的停止标准包括验证性能在某些时期 ( patience) 中停滞不前、达到预期性能等。
- # Early stopping
- if val_loss < best_val_loss:
- best_val_loss = val_loss
- best_model = trainer.model
- _patience = patience # reset _patience
- else:
- _patience -= 1
- if not _patience: # 0
- print("Stopping early!")
- break
现在让我们将所有这些放在一起来训练我们的模型。
from torch.optim import Adam
- LEARNING_RATE = 1e-2
- NUM_EPOCHS = 100
- PATIENCE = 3
- # Define Loss
- class_weights_tensor = torch.Tensor(list(class_weights.values())).to(device)
- loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor)
- # Define optimizer & scheduler
- optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
- scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
- optimizer, mode="min", factor=0.1, patience=3)
- class Trainer(object):
- def __init__(self, model, device, loss_fn=None, optimizer=None, scheduler=None):
-
- # Set params
- self.model = model
- self.device = device
- self.loss_fn = loss_fn
- self.optimizer = optimizer
- self.scheduler = scheduler
-
- def train_step(self, dataloader):
- """Train step."""
- # Set model to train mode
- self.model.train()
- loss = 0.0
-
- # Iterate over train batches
- for i, batch in enumerate(dataloader):
-
- # Step
- batch = [item.to(self.device) for item in batch] # Set device
- inputs, targets = batch[:-1], batch[-1]
- self.optimizer.zero_grad() # Reset gradients
- z = self.model(inputs) # Forward pass
- J = self.loss_fn(z, targets) # Define loss
- J.backward() # Backward pass
- self.optimizer.step() # Update weights
-
- # Cumulative Metrics
- loss += (J.detach().item() - loss) / (i + 1)
-
- return loss
-
- def eval_step(self, dataloader):
- """Validation or test step."""
- # Set model to eval mode
- self.model.eval()
- loss = 0.0
- y_trues, y_probs = [], []
-
- # Iterate over val batches
- with torch.inference_mode():
- for i, batch in enumerate(dataloader):
-
- # Step
- batch = [item.to(self.device) for item in batch] # Set device
- inputs, y_true = batch[:-1], batch[-1]
- z = self.model(inputs) # Forward pass
- J = self.loss_fn(z, y_true).item()
-
- # Cumulative Metrics
- loss += (J - loss) / (i + 1)
-
- # Store outputs
- y_prob = F.softmax(z).cpu().numpy()
- y_probs.extend(y_prob)
- y_trues.extend(y_true.cpu().numpy())
-
- return loss, np.vstack(y_trues), np.vstack(y_probs)
-
- def predict_step(self, dataloader):
- """Prediction step."""
- # Set model to eval mode
- self.model.eval()
- y_probs = []
-
- # Iterate over val batches
- with torch.inference_mode():
- for i, batch in enumerate(dataloader):
-
- # Forward pass w/ inputs
- inputs, targets = batch[:-1], batch[-1]
- z = self.model(inputs)
-
- # Store outputs
- y_prob = F.softmax(z).cpu().numpy()
- y_probs.extend(y_prob)
-
- return np.vstack(y_probs)
-
- def train(self, num_epochs, patience, train_dataloader, val_dataloader):
- best_val_loss = np.inf
- for epoch in range(num_epochs):
- # Steps
- train_loss = self.train_step(dataloader=train_dataloader)
- val_loss, _, _ = self.eval_step(dataloader=val_dataloader)
- self.scheduler.step(val_loss)
-
- # Early stopping
- if val_loss < best_val_loss:
- best_val_loss = val_loss
- best_model = self.model
- _patience = patience # reset _patience
- else:
- _patience -= 1
- if not _patience: # 0
- print("Stopping early!")
- break
-
- # Logging
- print(
- f"Epoch: {epoch+1} | "
- f"train_loss: {train_loss:.5f}, "
- f"val_loss: {val_loss:.5f}, "
- f"lr: {self.optimizer.param_groups[0]['lr']:.2E}, "
- f"_patience: {_patience}"
- )
- return best_model
-
- # Trainer module
- trainer = Trainer(
- model=model, device=device, loss_fn=loss_fn,
- optimizer=optimizer, scheduler=scheduler)
- # Train
- best_model = trainer.train(
- NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)
- Epoch: 1 | train_loss: 0.73999, val_loss: 0.58441, lr: 1.00E-02, _patience: 3
- Epoch: 2 | train_loss: 0.52631, val_loss: 0.41542, lr: 1.00E-02, _patience: 3
- Epoch: 3 | train_loss: 0.40919, val_loss: 0.30673, lr: 1.00E-02, _patience: 3
- Epoch: 4 | train_loss: 0.31421, val_loss: 0.22428, lr: 1.00E-02, _patience: 3
- ...
- Epoch: 48 | train_loss: 0.04100, val_loss: 0.02100, lr: 1.00E-02, _patience: 2
- Epoch: 49 | train_loss: 0.04155, val_loss: 0.02008, lr: 1.00E-02, _patience: 3
- Epoch: 50 | train_loss: 0.05295, val_loss: 0.02094, lr: 1.00E-02, _patience: 2
- Epoch: 51 | train_loss: 0.04619, val_loss: 0.02179, lr: 1.00E-02, _patience: 1
- Stopping early!
- import json
- from sklearn.metrics import precision_recall_fscore_support
- def get_metrics(y_true, y_pred, classes):
- """Per-class performance metrics."""
- # Performance
- performance = {"overall": {}, "class": {}}
-
- # Overall performance
- metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
- performance["overall"]["precision"] = metrics[0]
- performance["overall"]["recall"] = metrics[1]
- performance["overall"]["f1"] = metrics[2]
- performance["overall"]["num_samples"] = np.float64(len(y_true))
-
- # Per-class performance
- metrics = precision_recall_fscore_support(y_true, y_pred, average=None)
- for i in range(len(classes)):
- performance["class"][classes[i]] = {
- "precision": metrics[0][i],
- "recall": metrics[1][i],
- "f1": metrics[2][i],
- "num_samples": np.float64(metrics[3][i]),
- }
-
- return performance
-
- # Get predictions
- test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader)
- y_pred = np.argmax(y_prob, axis=1)
-
- # Determine performance
- performance = get_metrics(
- y_true=y_test, y_pred=y_pred, classes=label_encoder.classes)
- print (json.dumps(performance["overall"], indent=2))
- {
- “精度”:0.9956140350877193,
- “召回”:0.9955555555555556,
- “f1”:0.9955553580159119,
- “num_samples”:225.0
- }
许多教程从未向您展示如何保存您创建的组件,以便您可以加载它们进行推理。
from pathlib import Path
- # Save artifacts
- dir = Path("mlp")
- dir.mkdir(parents=True, exist_ok=True)
- label_encoder.save(fp=Path(dir, "label_encoder.json"))
- X_scaler.save(fp=Path(dir, "X_scaler.json"))
- torch.save(best_model.state_dict(), Path(dir, "model.pt"))
- with open(Path(dir, 'performance.json'), "w") as fp:
- json.dump(performance, indent=2, sort_keys=False, fp=fp)
- # Load artifacts
- device = torch.device("cpu")
- label_encoder = LabelEncoder.load(fp=Path(dir, "label_encoder.json"))
- X_scaler = StandardScaler.load(fp=Path(dir, "X_scaler.json"))
- model = MLP(
- input_dim=INPUT_DIM, hidden_dim=HIDDEN_DIM,
- dropout_p=DROPOUT_P, num_classes=NUM_CLASSES)
- model.load_state_dict(torch.load(Path(dir, "model.pt"), map_location=device))
- model.to(device)
- MLP(
- (fc1): Linear(in_features=2, out_features=100, bias=True)
- (dropout): Dropout(p=0.1, inplace=False)
- (fc2): Linear(in_features=100, out_features=3, bias=True)
- )
- # Initialize trainer
- trainer = Trainer(model=model, device=device)
- # Dataloader
- sample = [[0.106737, 0.114197]] # c1
- X = X_scaler.scale(sample)
- y_filler = label_encoder.encode([label_encoder.classes[0]]*len(X))
- dataset = Dataset(X=X, y=y_filler)
- dataloader = dataset.create_dataloader(batch_size=batch_size)
- # Inference
- y_prob = trainer.predict_step(dataloader)
- y_pred = np.argmax(y_prob, axis=1)
- label_encoder.decode(y_pred)
[“c1”]