

import torch.nn.functional as F
import torchvision
from mmengine.model import BaseModel
class MMResNet50(BaseModel):
def __init__(self):
super().__init__()
self.resnet = torchvision.models.resnet50()
def forward(self, imgs, labels, mode):
x = self.resnet(imgs)
if mode == 'loss':
return {'loss': F.cross_entropy(x, labels)}
elif mode == 'predict':
return x, labels
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
norm_cfg = dict(mean=[0.491, 0.482, 0.447], std=[0.202, 0.199, 0.201])
train_dataloader = DataLoader(batch_size=32,
shuffle=True,
dataset=torchvision.datasets.CIFAR10(
'data/cifar10',
train=True,
download=True,
transform=transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(**norm_cfg)
])))
val_dataloader = DataLoader(batch_size=32,
shuffle=False,
dataset=torchvision.datasets.CIFAR10(
'data/cifar10',
train=False,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(**norm_cfg)
])))
from mmengine.evaluator import BaseMetric
class Accuracy(BaseMetric):
def process(self, data_batch, data_samples):
score, gt = data_samples
# 将一个批次的中间结果保存至 `self.results`
self.results.append({
'batch_size': len(gt),
'correct': (score.argmax(dim=1) == gt).sum().cpu(),
})
def compute_metrics(self, results):
total_correct = sum(item['correct'] for item in results)
total_size = sum(item['batch_size'] for item in results)
# 返回保存有评测指标结果的字典,其中键为指标名称
return dict(accuracy=100 * total_correct / total_size)
from torch.optim import SGD
from mmengine.runner import Runner
runner = Runner(
# 用以训练和验证的模型,需要满足特定的接口需求
model=MMResNet50(),
# 工作路径,用以保存训练日志、权重文件信息
work_dir='./work_dir',
# 训练数据加载器,需要满足 PyTorch 数据加载器协议
train_dataloader=train_dataloader,
# 优化器包装,用于模型优化,并提供 AMP、梯度累积等附加功能
optim_wrapper=dict(optimizer=dict(type=SGD, lr=0.001, momentum=0.9)),
# 训练配置,用于指定训练周期、验证间隔等信息
train_cfg=dict(by_epoch=True, max_epochs=5, val_interval=1),
# 验证数据加载器,需要满足 PyTorch 数据加载器协议
val_dataloader=val_dataloader,
# 验证配置,用于指定验证所需要的额外参数
val_cfg=dict(),
# 用于验证的评测器,这里使用默认评测器,并评测指标
val_evaluator=dict(type=Accuracy),
)
runner.train()
class Registry:
"""A registry to map strings to classes or functions.
Registered object could be built from registry. Meanwhile, registered
functions could be called from registry.
Args:
name (str): Registry name.
build_func (callable, optional): A function to construct instance
from Registry. :func:`build_from_cfg` is used if neither ``parent``
or ``build_func`` is specified. If ``parent`` is specified and
``build_func`` is not given, ``build_func`` will be inherited
from ``parent``. Defaults to None.
parent (:obj:`Registry`, optional): Parent registry. The class
registered in children registry could be built from parent.
Defaults to None.
scope (str, optional): The scope of registry. It is the key to search
for children registry. If not specified, scope will be the name of
the package where class is defined, e.g. mmdet, mmcls, mmseg.
Defaults to None.
Examples:
>>> # define a registry
>>> MODELS = Registry('models')
>>> # registry the `ResNet` to `MODELS`
>>> @MODELS.register_module()
>>> class ResNet:
>>> pass
>>> # build model from `MODELS`
>>> resnet = MODELS.build(dict(type='ResNet'))
>>> @MODELS.register_module()
>>> def resnet50():
>>> pass
>>> resnet = MODELS.build(dict(type='resnet50'))
>>> # hierarchical registry
>>> DETECTORS = Registry('detectors', parent=MODELS, scope='det')
>>> @DETECTORS.register_module()
>>> class FasterRCNN:
>>> pass
>>> fasterrcnn = DETECTORS.build(dict(type='FasterRCNN'))
More advanced usages can be found at
https://mmengine.readthedocs.io/en/latest/tutorials/registry.html.
"""
def __init__(self,
name: str,
build_func: Optional[Callable] = None,
parent: Optional['Registry'] = None,
scope: Optional[str] = None):
from .build_functions import build_from_cfg
self._name = name
self._module_dict: Dict[str, Type] = dict()
self._children: Dict[str, 'Registry'] = dict()
if scope is not None:
assert isinstance(scope, str)
self._scope = scope
else:
self._scope = self.infer_scope()
# See https://mypy.readthedocs.io/en/stable/common_issues.html#
# variables-vs-type-aliases for the use
self.parent: Optional['Registry']
if parent is not None:
assert isinstance(parent, Registry)
parent._add_child(self)
self.parent = parent
else:
self.parent = None
# self.build_func will be set with the following priority:
# 1. build_func
# 2. parent.build_func
# 3. build_from_cfg
self.build_func: Callable
if build_func is None:
if self.parent is not None:
self.build_func = self.parent.build_func
else:
self.build_func = build_from_cfg
else:
self.build_func = build_func
from mmengine import Registry
# scope 表示注册器的作用域,如果不设置,默认为包名,例如在 mmdetection 中,它的 scope 为 mmdet
ACTIVATION = Registry('activation', scope='mmengine')
import torch.nn as nn
# 使用注册器管理模块
@ACTIVATION.register_module()
class Sigmoid(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
print('call Sigmoid.forward')
return x
@ACTIVATION.register_module()
class ReLU(nn.Module):
def __init__(self, inplace=False):
super().__init__()
def forward(self, x):
print('call ReLU.forward')
return x
@ACTIVATION.register_module()
class Softmax(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
print('call Softmax.forward')
return x
print(ACTIVATION.module_dict)
# {
# 'Sigmoid': __main__.Sigmoid,
# 'ReLU': __main__.ReLU,
# 'Softmax': __main__.Softmax
# }
import torch
input = torch.randn(2)
act_cfg = dict(type='Sigmoid')
activation = ACTIVATION.build(act_cfg)
output = activation(input)
# call Sigmoid.forward
print(output)
#如果我们想使用 ReLU,仅需修改配置。
act_cfg = dict(type='ReLU', inplace=True)
activation = ACTIVATION.build(act_cfg)
output = activation(input)
# call ReLU.forward
print(output)
test_int = 1
test_list = [1, 2, 3]
test_dict = dict(key1='value1', key2=0.1)
{
"test_int": 1,
"test_list": [1, 2, 3],
"test_dict": {"key1": "value1", "key2": 0.1}
}
test_int: 1
test_list: [1, 2, 3]
test_dict:
key1: "value1"
key2: 0.1
from mmengine.config import Config
cfg = Config.fromfile('learn_read_config.py')
print(cfg)
Config (path: learn_read_config.py): {'test_int': 1, 'test_list': [1, 2, 3], 'test_dict': {'key1': 'value1', 'key2': 0.1}}
print(cfg.test_int)
print(cfg.test_list)
print(cfg.test_dict)
cfg.test_int = 2
print(cfg['test_int'])
print(cfg['test_list'])
print(cfg['test_dict'])
cfg['test_list'][1] = 3
print(cfg['test_list'])
1
[1, 2, 3]
{'key1': 'value1', 'key2': 0.1}
2
[1, 2, 3]
{'key1': 'value1', 'key2': 0.1}
[1, 3, 3]
optimizer = dict(type='SGD', lr=0.1, momentum=0.9, weight_decay=0.0001)
from mmengine import Config, optim
from mmengine.registry import OPTIMIZERS # 在mmengine root.py中定义
import torch.nn as nn
cfg = Config.fromfile('config_sgd.py')
model = nn.Conv2d(1, 1, 1)
cfg.optimizer.params = model.parameters()
optimizer = OPTIMIZERS.build(cfg.optimizer)
print(optimizer)
SGD (
Parameter Group 0
dampening: 0
foreach: None
lr: 0.1
maximize: False
momentum: 0.9
nesterov: False
weight_decay: 0.0001
)
optimizer = dict(type='SGD', lr=0.02, momentum=0.9, weight_decay=0.0001)
_base_ = ['optimizer_cfg.py']
model = dict(type='ResNet', depth=50)
cfg = Config.fromfile('resnet50.py')
print(cfg.optimizer)
{'type': 'SGD', 'lr': 0.02, 'momentum': 0.9, 'weight_decay': 0.0001}
gpu_ids = [0, 1]
_base_ = ['optimizer_cfg.py', 'runtime_cfg.py']
model = dict(type='ResNet', depth=50)
cfg = Config.fromfile('resnet50_runtime.py')
print(cfg.optimizer)
{'type': 'SGD', 'lr': 0.02, 'momentum': 0.9, 'weight_decay': 0.0001}
_base_ = ['optimizer_cfg.py', 'runtime_cfg.py']
model = dict(type='ResNet', depth=50)
optimizer = dict(lr=0.01)
cfg = Config.fromfile('resnet50_lr0.01.py')
print(cfg.optimizer)
{'type': 'SGD', 'lr': 0.01, 'momentum': 0.9, 'weight_decay': 0.0001}
_base_ = ['optimizer_cfg.py', 'runtime_cfg.py']
model = dict(type='ResNet', depth=50)
gpu_ids = [0]
_base_ = ['optimizer_cfg.py', 'runtime_cfg.py']
model = dict(type='ResNet', depth=50)
optimizer = dict(_delete_=True, type='SGD', lr=0.01)
cfg = Config.fromfile('resnet50_delete_key.py')
print(cfg.optimizer)
{'type': 'SGD', 'lr': 0.01}
@RUNNERS.register_module()
class Runner:
"""A training helper for PyTorch.
Runner object can be built from config by ``runner = Runner.from_cfg(cfg)``
where the ``cfg`` usually contains training, validation, and test-related
configurations to build corresponding components. We usually use the
same config to launch training, testing, and validation tasks. However,
only some of these components are necessary at the same time, e.g.,
testing a model does not need training or validation-related components.
To avoid repeatedly modifying config, the construction of ``Runner`` adopts
lazy initialization to only initialize components when they are going to be
used. Therefore, the model is always initialized at the beginning, and
training, validation, and, testing related components are only initialized
when calling ``runner.train()``, ``runner.val()``, and ``runner.test()``,
respectively.
Args:
model (:obj:`torch.nn.Module` or dict): The model to be run. It can be
a dict used for build a model.
work_dir (str): The working directory to save checkpoints. The logs
will be saved in the subdirectory of `work_dir` named
:attr:`timestamp`.
train_dataloader (Dataloader or dict, optional): A dataloader object or
a dict to build a dataloader. If ``None`` is given, it means
skipping training steps. Defaults to None.
See :meth:`build_dataloader` for more details.
val_dataloader (Dataloader or dict, optional): A dataloader object or
a dict to build a dataloader. If ``None`` is given, it means
skipping validation steps. Defaults to None.
See :meth:`build_dataloader` for more details.
test_dataloader (Dataloader or dict, optional): A dataloader object or
a dict to build a dataloader. If ``None`` is given, it means
skipping test steps. Defaults to None.
See :meth:`build_dataloader` for more details.
train_cfg (dict, optional): A dict to build a training loop. If it does
not provide "type" key, it should contain "by_epoch" to decide
which type of training loop :class:`EpochBasedTrainLoop` or
:class:`IterBasedTrainLoop` should be used. If ``train_cfg``
specified, :attr:`train_dataloader` should also be specified.
Defaults to None. See :meth:`build_train_loop` for more details.
val_cfg (dict, optional): A dict to build a validation loop. If it does
not provide "type" key, :class:`ValLoop` will be used by default.
If ``val_cfg`` specified, :attr:`val_dataloader` should also be
specified. If ``ValLoop`` is built with `fp16=True``,
``runner.val()`` will be performed under fp16 precision.
Defaults to None. See :meth:`build_val_loop` for more details.
test_cfg (dict, optional): A dict to build a test loop. If it does
not provide "type" key, :class:`TestLoop` will be used by default.
If ``test_cfg`` specified, :attr:`test_dataloader` should also be
specified. If ``ValLoop`` is built with `fp16=True``,
``runner.val()`` will be performed under fp16 precision.
Defaults to None. See :meth:`build_test_loop` for more details.
auto_scale_lr (dict, Optional): Config to scale the learning rate
automatically. It includes ``base_batch_size`` and ``enable``.
``base_batch_size`` is the batch size that the optimizer lr is
based on. ``enable`` is the switch to turn on and off the feature.
optim_wrapper (OptimWrapper or dict, optional):
Computing gradient of model parameters. If specified,
:attr:`train_dataloader` should also be specified. If automatic
mixed precision or gradient accmulation
training is required. The type of ``optim_wrapper`` should be
AmpOptimizerWrapper. See :meth:`build_optim_wrapper` for
examples. Defaults to None.
param_scheduler (_ParamScheduler or dict or list, optional):
Parameter scheduler for updating optimizer parameters. If
specified, :attr:`optimizer` should also be specified.
Defaults to None.
See :meth:`build_param_scheduler` for examples.
val_evaluator (Evaluator or dict or list, optional): A evaluator object
used for computing metrics for validation. It can be a dict or a
list of dict to build a evaluator. If specified,
:attr:`val_dataloader` should also be specified. Defaults to None.
test_evaluator (Evaluator or dict or list, optional): A evaluator
object used for computing metrics for test steps. It can be a dict
or a list of dict to build a evaluator. If specified,
:attr:`test_dataloader` should also be specified. Defaults to None.
default_hooks (dict[str, dict] or dict[str, Hook], optional): Hooks to
execute default actions like updating model parameters and saving
checkpoints. Default hooks are ``OptimizerHook``,
``IterTimerHook``, ``LoggerHook``, ``ParamSchedulerHook`` and
``CheckpointHook``. Defaults to None.
See :meth:`register_default_hooks` for more details.
custom_hooks (list[dict] or list[Hook], optional): Hooks to execute
custom actions like visualizing images processed by pipeline.
Defaults to None.
data_preprocessor (dict, optional): The pre-process config of
:class:`BaseDataPreprocessor`. If the ``model`` argument is a dict
and doesn't contain the key ``data_preprocessor``, set the argument
as the ``data_preprocessor`` of the ``model`` dict.
Defaults to None.
load_from (str, optional): The checkpoint file to load from.
Defaults to None.
resume (bool): Whether to resume training. Defaults to False. If
``resume`` is True and ``load_from`` is None, automatically to
find latest checkpoint from ``work_dir``. If not found, resuming
does nothing.
launcher (str): Way to launcher multi-process. Supported launchers
are 'pytorch', 'mpi', 'slurm' and 'none'. If 'none' is provided,
non-distributed environment will be launched.
env_cfg (dict): A dict used for setting environment. Defaults to
dict(dist_cfg=dict(backend='nccl')).
log_processor (dict, optional): A processor to format logs. Defaults to
None.
log_level (int or str): The log level of MMLogger handlers.
Defaults to 'INFO'.
visualizer (Visualizer or dict, optional): A Visualizer object or a
dict build Visualizer object. Defaults to None. If not
specified, default config will be used.
default_scope (str): Used to reset registries location.
Defaults to "mmengine".
randomness (dict): Some settings to make the experiment as reproducible
as possible like seed and deterministic.
Defaults to ``dict(seed=None)``. If seed is None, a random number
will be generated and it will be broadcasted to all other processes
if in distributed environment. If ``cudnn_benchmarch`` is
``True`` in ``env_cfg`` but ``deterministic`` is ``True`` in
``randomness``, the value of ``torch.backends.cudnn.benchmark``
will be ``False`` finally.
experiment_name (str, optional): Name of current experiment. If not
specified, timestamp will be used as ``experiment_name``.
Defaults to None.
cfg (dict or Configdict or :obj:`Config`, optional): Full config.
Defaults to None.
Examples:
>>> from mmengine.runner import Runner
>>> cfg = dict(
>>> model=dict(type='ToyModel'),
>>> work_dir='path/of/work_dir',
>>> train_dataloader=dict(
>>> dataset=dict(type='ToyDataset'),
>>> sampler=dict(type='DefaultSampler', shuffle=True),
>>> batch_size=1,
>>> num_workers=0),
>>> val_dataloader=dict(
>>> dataset=dict(type='ToyDataset'),
>>> sampler=dict(type='DefaultSampler', shuffle=False),
>>> batch_size=1,
>>> num_workers=0),
>>> test_dataloader=dict(
>>> dataset=dict(type='ToyDataset'),
>>> sampler=dict(type='DefaultSampler', shuffle=False),
>>> batch_size=1,
>>> num_workers=0),
>>> auto_scale_lr=dict(base_batch_size=16, enable=False),
>>> optim_wrapper=dict(type='OptimizerWrapper', optimizer=dict(
>>> type='SGD', lr=0.01)),
>>> param_scheduler=dict(type='MultiStepLR', milestones=[1, 2]),
>>> val_evaluator=dict(type='ToyEvaluator'),
>>> test_evaluator=dict(type='ToyEvaluator'),
>>> train_cfg=dict(by_epoch=True, max_epochs=3, val_interval=1),
>>> val_cfg=dict(),
>>> test_cfg=dict(),
>>> custom_hooks=[],
>>> default_hooks=dict(
>>> timer=dict(type='IterTimerHook'),
>>> checkpoint=dict(type='CheckpointHook', interval=1),
>>> logger=dict(type='LoggerHook'),
>>> optimizer=dict(type='OptimizerHook', grad_clip=False),
>>> param_scheduler=dict(type='ParamSchedulerHook')),
>>> launcher='none',
>>> env_cfg=dict(dist_cfg=dict(backend='nccl')),
>>> log_processor=dict(window_size=20),
>>> visualizer=dict(type='Visualizer',
>>> vis_backends=[dict(type='LocalVisBackend',
>>> save_dir='temp_dir')])
>>> )
>>> runner = Runner.from_cfg(cfg)
>>> runner.train()
>>> runner.test()
"""
cfg: Config
_train_loop: Optional[Union[BaseLoop, Dict]]
_val_loop: Optional[Union[BaseLoop, Dict]]
_test_loop: Optional[Union[BaseLoop, Dict]]
def __init__(
self,
model: Union[nn.Module, Dict],
work_dir: str,
train_dataloader: Optional[Union[DataLoader, Dict]] = None,
val_dataloader: Optional[Union[DataLoader, Dict]] = None,
test_dataloader: Optional[Union[DataLoader, Dict]] = None,
train_cfg: Optional[Dict] = None,
val_cfg: Optional[Dict] = None,
test_cfg: Optional[Dict] = None,
auto_scale_lr: Optional[Dict] = None,
optim_wrapper: Optional[Union[OptimWrapper, Dict]] = None,
param_scheduler: Optional[Union[_ParamScheduler, Dict, List]] = None,
val_evaluator: Optional[Union[Evaluator, Dict, List]] = None,
test_evaluator: Optional[Union[Evaluator, Dict, List]] = None,
default_hooks: Optional[Dict[str, Union[Hook, Dict]]] = None,
custom_hooks: Optional[List[Union[Hook, Dict]]] = None,
data_preprocessor: Union[nn.Module, Dict, None] = None,
load_from: Optional[str] = None,
resume: bool = False,
launcher: str = 'none',
env_cfg: Dict = dict(dist_cfg=dict(backend='nccl')),
log_processor: Optional[Dict] = None,
log_level: str = 'INFO',
visualizer: Optional[Union[Visualizer, Dict]] = None,
default_scope: str = 'mmengine',
randomness: Dict = dict(seed=None),
experiment_name: Optional[str] = None,
cfg: Optional[ConfigType] = None,
):
self._work_dir = osp.abspath(work_dir)
mmengine.mkdir_or_exist(self._work_dir)
# recursively copy the `cfg` because `self.cfg` will be modified
# everywhere.
if cfg is not None:
if isinstance(cfg, Config):
self.cfg = copy.deepcopy(cfg)
elif isinstance(cfg, dict):
self.cfg = Config(cfg)
else:
self.cfg = Config(dict())
# lazy initialization
training_related = [train_dataloader, train_cfg, optim_wrapper]
if not (all(item is None for item in training_related)
or all(item is not None for item in training_related)):
raise ValueError(
'train_dataloader, train_cfg, and optimizer should be either '
'all None or not None, but got '
f'train_dataloader={train_dataloader}, '
f'train_cfg={train_cfg}, '
f'optim_wrapper={optim_wrapper}.')
self._train_dataloader = train_dataloader
self._train_loop = train_cfg
self.optim_wrapper: Optional[Union[OptimWrapper, dict]]
self.optim_wrapper = optim_wrapper
self.auto_scale_lr = auto_scale_lr
# If there is no need to adjust learning rate, momentum or other
# parameters of optimizer, param_scheduler can be None
if param_scheduler is not None and self.optim_wrapper is None:
raise ValueError(
'param_scheduler should be None when optimizer is None, '
f'but got {param_scheduler}')
# Parse `param_scheduler` to a list or a dict. If `optim_wrapper` is a
# `dict` with single optimizer, parsed param_scheduler will be a
# list of parameter schedulers. If `optim_wrapper` is
# a `dict` with multiple optimizers, parsed `param_scheduler` will be
# dict with multiple list of parameter schedulers.
self._check_scheduler_cfg(param_scheduler)
self.param_schedulers = param_scheduler
val_related = [val_dataloader, val_cfg, val_evaluator]
if not (all(item is None
for item in val_related) or all(item is not None
for item in val_related)):
raise ValueError(
'val_dataloader, val_cfg, and val_evaluator should be either '
'all None or not None, but got '
f'val_dataloader={val_dataloader}, val_cfg={val_cfg}, '
f'val_evaluator={val_evaluator}')
self._val_dataloader = val_dataloader
self._val_loop = val_cfg
self._val_evaluator = val_evaluator
test_related = [test_dataloader, test_cfg, test_evaluator]
if not (all(item is None for item in test_related)
or all(item is not None for item in test_related)):
raise ValueError(
'test_dataloader, test_cfg, and test_evaluator should be '
'either all None or not None, but got '
f'test_dataloader={test_dataloader}, test_cfg={test_cfg}, '
f'test_evaluator={test_evaluator}')
self._test_dataloader = test_dataloader
self._test_loop = test_cfg
self._test_evaluator = test_evaluator
self._launcher = launcher
if self._launcher == 'none':
self._distributed = False
else:
self._distributed = True
# self._timestamp will be set in the `setup_env` method. Besides,
# it also will initialize multi-process and (or) distributed
# environment.
self.setup_env(env_cfg)
# self._deterministic and self._seed will be set in the
# `set_randomness`` method
self._randomness_cfg = randomness
self.set_randomness(**randomness)
if experiment_name is not None:
self._experiment_name = f'{experiment_name}_{self._timestamp}'
elif self.cfg.filename is not None:
filename_no_ext = osp.splitext(osp.basename(self.cfg.filename))[0]
self._experiment_name = f'{filename_no_ext}_{self._timestamp}'
else:
self._experiment_name = self.timestamp
self._log_dir = osp.join(self.work_dir, self.timestamp)
mmengine.mkdir_or_exist(self._log_dir)
# Used to reset registries location. See :meth:`Registry.build` for
# more details.
self.default_scope = DefaultScope.get_instance(
self._experiment_name, scope_name=default_scope)
# Build log processor to format message.
log_processor = dict() if log_processor is None else log_processor
self.log_processor = self.build_log_processor(log_processor)
# Since `get_instance` could return any subclass of ManagerMixin. The
# corresponding attribute needs a type hint.
self.logger = self.build_logger(log_level=log_level)
# Collect and log environment information.
self._log_env(env_cfg)
# collect information of all modules registered in the registries
registries_info = count_registered_modules(
self.work_dir if self.rank == 0 else None, verbose=False)
self.logger.debug(registries_info)
# Build `message_hub` for communication among components.
# `message_hub` can store log scalars (loss, learning rate) and
# runtime information (iter and epoch). Those components that do not
# have access to the runner can get iteration or epoch information
# from `message_hub`. For example, models can get the latest created
# `message_hub` by
# `self.message_hub=MessageHub.get_current_instance()` and then get
# current epoch by `cur_epoch = self.message_hub.get_info('epoch')`.
# See `MessageHub` and `ManagerMixin` for more details.
self.message_hub = self.build_message_hub()
# visualizer used for writing log or visualizing all kinds of data
self.visualizer = self.build_visualizer(visualizer)
if self.cfg:
self.visualizer.add_config(self.cfg)
self._load_from = load_from
self._resume = resume
# flag to mark whether checkpoint has been loaded or resumed
self._has_loaded = False
# build a model
if isinstance(model, dict) and data_preprocessor is not None:
# Merge the data_preprocessor to model config.
model.setdefault('data_preprocessor', data_preprocessor)
self.model = self.build_model(model)
# wrap model
self.model = self.wrap_model(
self.cfg.get('model_wrapper_cfg'), self.model)
# get model name from the model class
if hasattr(self.model, 'module'):
self._model_name = self.model.module.__class__.__name__
else:
self._model_name = self.model.__class__.__name__
self._hooks: List[Hook] = []
# register hooks to `self._hooks`
self.register_hooks(default_hooks, custom_hooks)
# dump `cfg` to `work_dir`
self.dump_config()
# 准备训练任务所需要的模块
import torch
from torch import nn
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
from mmengine.model import BaseModel
from mmengine.optim.scheduler import MultiStepLR
# 定义一个多层感知机网络
class Network(BaseModel):
def __init__(self):
super().__init__()
self.mlp = nn.Sequential(nn.Linear(28 * 28, 128), nn.ReLU(), nn.Linear(128, 128), nn.ReLU(), nn.Linear(128, 10))
self.loss = nn.CrossEntropyLoss()
def forward(self, batch_inputs: torch.Tensor, data_samples = None, mode: str = 'tensor'):
x = batch_inputs.flatten(1)
x = self.mlp(x)
if mode == 'loss':
return {'loss': self.loss(x, data_samples)}
elif mode == 'predict':
return x.argmax(1)
else:
return x
model = Network()
# 构建优化器
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# 构建参数调度器用于调整学习率
lr_scheduler = MultiStepLR(optimizer, milestones=[2], by_epoch=True)
# 构建手写数字识别 (MNIST) 数据集
train_dataset = datasets.MNIST(root="MNIST", download=True, train=True, transform=transforms.ToTensor())
# 构建数据加载器
train_dataloader = DataLoader(dataset=train_dataset, batch_size=10, num_workers=2)
from mmengine.runner import Runner
# 训练相关参数设置,按迭代次数训练,训练9000次迭代
#train_cfg = dict(by_epoch=False, max_iters=9000)
# 训练相关参数设置,按轮次训练,训练3轮
train_cfg = dict(by_epoch=True, max_epochs=3)
# 初始化执行器
runner = Runner(model,
work_dir='./train_mnist', # 工作目录,用于保存模型和日志
train_cfg=train_cfg,
train_dataloader=train_dataloader,
optim_wrapper=dict(optimizer=optimizer),
param_scheduler=lr_scheduler)
# 执行训练
runner.train()
from mmengine.evaluator import BaseMetric
class MnistAccuracy(BaseMetric):
def process(self, data, preds) -> None:
self.results.append(((data[1] == preds.cpu()).sum(), len(preds)))
def compute_metrics(self, results):
correct, batch_size = zip(*results)
acc = sum(correct) / sum(batch_size)
return dict(accuracy=acc)
model = Network()
test_dataset = datasets.MNIST(root="MNIST", download=True, train=False, transform=transforms.ToTensor())
test_dataloader = DataLoader(dataset=test_dataset)
metric = MnistAccuracy()
test_evaluator = Evaluator(metric)
# 初始化执行器
runner = Runner(model=model,
test_dataloader=test_dataloader,
test_evaluator=test_evaluator,
load_from='./train_mnist/epoch_3.pth',
work_dir='./test_mnist')
# 执行测试
runner.test()
# 准备训练任务所需要的模块
optimzier = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
lr_scheduler = MultiStepLR(milestones=[2], by_epoch=True)
train_dataset = datasets.MNIST(root="MNIST", download=True, train=True, transform=transforms.ToTensor())
train_dataloader = DataLoader(dataset=train_dataset, batch_size=10, num_workers=2)
# 准备验证需要的模块
val_dataset = datasets.MNIST(root="MNIST", download=True, train=False, transform=transforms.ToTensor())
val_dataloader = Dataloader(dataset=val_dataset)
metric = MnistAccuracy()
val_evaluator = Evaluator(metric)
# 训练相关参数设置
train_cfg = dict(by_epoch=True, # 按轮次训练
max_epochs=5, # 训练5轮
val_begin=2, # 从第 2 个 epoch 开始验证
val_interval=1) # 每隔1轮进行1次验证
# 初始化执行器
runner = Runner(model=model,
optim_wrapper=dict(optimizer=optimzier),
param_scheduler=lr_scheduler,
train_dataloader=train_dataloader,
val_dataloader=val_dataloader,
val_evaluator=val_evaluator,
train_cfg=train_cfg,
work_dir='./train_val_mnist')
# 执行训练
runner.train()
from mmengine import Config
from mmengine.runner import Runner
# 加载配置文件
config = Config.fromfile('configs/resnet/resnet50_8xb32_in1k.py')
# 通过配置文件初始化执行器
runner = Runner.build_from_cfg(config)
# 执行训练
runner.train()
# 执行测试
runner.test()
# 工作目录,保存权重和日志
work_dir = './train_resnet'
# 默认注册器域
default_scope = 'mmcls' # 默认使用 `mmcls` (MMClassification) 注册器中的模块
# 模型配置
model = dict(type='ImageClassifier',
backbone=dict(type='ResNet', depth=50),
neck=dict(type='GlobalAveragePooling'),
head=dict(type='LinearClsHead',num_classes=1000))
# 数据配置
train_dataloader = dict(dataset=dict(type='ImageNet', pipeline=[...]),
sampler=dict(type='DefaultSampler', shuffle=True),
batch_size=32,
num_workers=4)
val_dataloader = ...
test_dataloader = ...
# 优化器配置
optim_wrapper = dict(
optimizer=dict(type='SGD', lr=0.1, momentum=0.9, weight_decay=0.0001))
# 参数调度器配置
param_scheduler = dict(
type='MultiStepLR', by_epoch=True, milestones=[30, 60, 90], gamma=0.1)
#验证和测试的评测器配置
val_evaluator = dict(type='Accuracy')
test_evaluator = dict(type='Accuracy')
# 训练、验证、测试流程配置
train_cfg = dict(
by_epoch=True,
max_epochs=100,
val_begin=20, # 从第 20 个 epoch 开始验证
val_interval=1 # 每隔一个 epoch 进行一次验证
)
val_cfg = dict()
test_cfg = dict()
# 自定义钩子 (可选)
custom_hooks = [...]
# 默认钩子 (可选,未在配置文件中写明时将使用默认配置)
default_hooks = dict(
runtime_info=dict(type='RuntimeInfoHook'), # 运行时信息钩子
timer=dict(type='IterTimerHook'), # 计时器钩子
sampler_seed=dict(type='DistSamplerSeedHook'), # 为每轮次的数据采样设置随机种子的钩子
logger=dict(type='TextLoggerHook'), # 训练日志钩子
param_scheduler=dict(type='ParamSchedulerHook'), # 参数调度器执行钩子
checkpoint=dict(type='CheckpointHook', interval=1), # 模型保存钩子
)
# 环境配置 (可选,未在配置文件中写明时将使用默认配置)
env_cfg = dict(
cudnn_benchmark=False, # 是否使用 cudnn_benchmark
dist_cfg=dict(backend='nccl'), # 分布式通信后端
mp_cfg=dict(mp_start_method='fork') # 多进程设置
)
# 日志处理器 (可选,未在配置文件中写明时将使用默认配置)
log_processor = dict(type='LogProcessor', window_size=50, by_epoch=True)
# 日志等级配置
log_level = 'INFO'
# 加载权重的路径 (None 表示不加载)
load_from = None
# 从加载的权重文件中恢复训练
resume = False
runner = Runner(model=model,
test_dataloader=test_dataloader,
test_evaluator=test_evaluator,
load_from='./resnet50.pth')
| 名称 | 用途 | 优先级 |
|---|---|---|
| RuntimeInfoHook | 往 message hub 更新运行时信息 | VERY_HIGH (10) |
| IterTimerHook | 统计迭代耗时 | NORMAL (50) |
| DistSamplerSeedHook | 确保分布式 Sampler 的 shuffle 生效 | NORMAL (50) |
| LoggerHook | 打印日志 | BELOW_NORMAL (60) |
| ParamSchedulerHook | 调用 ParamScheduler 的 step 方法 | LOW (70) |
| CheckpointHook | 按指定间隔保存权重 | VERY_LOW (90) |
| 名称 | 用途 | 优先级 |
|---|---|---|
| EMAHook | 模型参数指数滑动平均 | NORMAL (50) |
| EmptyCacheHook | PyTorch CUDA 缓存清理 | NORMAL (50) |
| SyncBuffersHook | 同步模型的 buffer | NORMAL (50) |
| NaiveVisualizationHook | 可视化 | LOWEST (100) |
from mmengine.runner import Runner
default_hooks = dict(
runtime_info=dict(type='RuntimeInfoHook'),
timer=dict(type='IterTimerHook'),
sampler_seed=dict(type='DistSamplerSeedHook'),
logger=dict(type='LoggerHook'),
param_scheduler=dict(type='ParamSchedulerHook'),
checkpoint=dict(type='CheckpointHook', interval=1),
)
custom_hooks = [
dict(type='NaiveVisualizationHook', priority='LOWEST'),
]
runner = Runner(default_hooks=default_hooks, custom_hooks=custom_hooks, ...)
runner.train()
# by_epoch 的默认值为 True, 每隔 5 个 epoch 保存一次权重
default_hooks = dict(checkpoint=dict(type='CheckpointHook', interval=5, by_epoch=True))
# 以迭代次数作为保存间隔,则可以将 by_epoch 设为 False,
# interval=5 则表示每迭代 5 次保存一次权重
default_hooks = dict(checkpoint=dict(type='CheckpointHook', interval=5, by_epoch=False))
# 假如一共训练 20 个 epoch,那么会在第 5, 10, 15, 20 个 epoch 保存模型,
# 但是在第 15 个 epoch 的时候会删除第 5 个 epoch 保存的权重,在第 20 个 epoch 的时候
# 会删除第 10 个 epoch 的权重,最终只有第 15 和第 20 个 epoch 的权重才会被保存。
default_hooks = dict(checkpoint=dict(type='CheckpointHook', interval=5, max_keep_ckpts=2))
# 如果想要保存训练过程验证集的最优权重,可以设置 save_best 参数,如果设置为 'auto',
# 则会根据验证集的第一个评价指标(验证集返回的评价指标是一个有序字典)判断当前权重是否最优。
default_hooks = dict(checkpoint=dict(type='CheckpointHook', save_best='auto'))
default_hooks = dict(checkpoint=dict(type='CheckpointHook', interval=5, out_dir='/path/of/directory'))
default_hooks = dict(logger=dict(type='LoggerHook', interval=20))
import torch
from mmengine.registry import HOOKS
from mmengine.hooks import Hook
@HOOKS.register_module()
class CheckInvalidLossHook(Hook):
"""Check invalid loss hook.
This hook will regularly check whether the loss is valid
during training.
Args:
interval (int): Checking interval (every k iterations).
Defaults to 50.
"""
def __init__(self, interval=50):
self.interval = interval
def after_train_iter(self, runner, batch_idx, data_batch=None, outputs=None):
"""All subclasses should override this method, if they need any
operations after each training iteration.
Args:
runner (Runner): The runner of the training process.
batch_idx (int): The index of the current batch in the train loop.
data_batch (dict or tuple or list, optional): Data from dataloader.
outputs (dict, optional): Outputs from model.
"""
if self.every_n_train_iters(runner, self.interval):
assert torch.isfinite(outputs['loss']),\
runner.logger.info('loss become infinite or NaN!')
from mmengine.runner import Runner
custom_hooks = dict(
dict(type='CheckInvalidLossHook', interval=50)
)
runner = Runner(custom_hooks=custom_hooks, ...) # 实例化执行器,主要完成环境的初始化以及各种模块的构建
runner.train() # 执行器开始训练
from torch.utils.data import DataLoader
from torch import nn
from torchvision import datasets
from torchvision.transforms import ToTensor
from mmengine.model import BaseModel
from mmengine.evaluator import BaseMetric
from mmengine.runner import Runner
training_data = datasets.FashionMNIST(
root="data",
train=True,
download=True,
transform=ToTensor()
)
test_data = datasets.FashionMNIST(
root="data",
train=False,
download=True,
transform=ToTensor()
)
train_dataloader = DataLoader(dataset=training_data, batch_size=64)
test_dataloader = DataLoader(dataset=test_data, batch_size=64)
class NeuralNetwork(BaseModel):
def __init__(self, data_preprocessor=None):
super(NeuralNetwork, self).__init__(data_preprocessor)
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28*28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
self.loss = nn.CrossEntropyLoss()
def forward(self, img, label, mode='tensor'):
x = self.flatten(img)
pred = self.linear_relu_stack(x)
loss = self.loss(pred, label)
if mode == 'loss':
return dict(loss=loss)
elif mode=='predict':
return pred.argmax(1), loss.item()
else:
return pred
class FashionMnistMetric(BaseMetric):
def process(self, data, preds) -> None:
# data 参数为 Dataloader 返回的元组,即 (img, label)
# predict 为模型 `predict` 模式下,返回的元组,分别为 `pred.argmax(1) 和 `loss``
self.results.append(((data[1] == preds[0].cpu()).sum(), preds[1], len(preds[0])))
def compute_metrics(self, results):
correct, loss, batch_size = zip(*results)
test_loss, correct = sum(loss) / len(self.results), sum(correct) / sum(batch_size)
return dict(Accuracy=correct, Avg_loss=test_loss)
runner = Runner(
model=NeuralNetwork(),
work_dir='./work_dir',
train_dataloader=train_dataloader,
optim_wrapper=dict(optimizer=dict(type='SGD', lr=1e-3)),
train_cfg=dict(by_epoch=True, max_epochs=5, val_interval=1),
val_cfg=dict(fp16=True),
val_dataloader=test_dataloader,
val_evaluator=dict(metrics=FashionMnistMetric()))
runner.train()
from torch.optim import SGD
from mmengine.model import BaseDataPreprocessor, BaseModel
class NeuralNetwork1(NeuralNetwork):
def __init__(self, data_preprocessor):
super().__init__(data_preprocessor=data_preprocessor)
self.data_preprocessor = data_preprocessor
def train_step(self, data, optimizer):
img, label = self.data_preprocessor(data)
loss = self(img, label, mode='loss')['loss'].sum()
loss.backward()
optimizer.step()
optimizer.zero_grad()
return dict(loss=loss)
def test_step(self, data):
img, label = self.data_preprocessor(data)
return self(img, label, mode='predict')
def val_step(self, data):
img, label = self.data_preprocessor(data)
return self(img, label, mode='predict')
class NormalizeDataPreprocessor(BaseDataPreprocessor):
def forward(self, data, training=False):
img, label = [item for item in data]
img = (img - 127.5) / 127.5
return img, label
model = NeuralNetwork1(data_preprocessor=NormalizeDataPreprocessor())
optimizer = SGD(model.parameters(), lr=0.01)
data = (torch.full((3, 28, 28), fill_value=127.5), torch.ones(3, 10))
model.train_step(data, optimizer)
model.val_step(data)
model.test_step(data)
val_evaluator = dict(type='Accuracy', top_k=(1, 5)) # 使用分类正确率评测指标
test_evaluator = [
# 目标检测指标
dict(
type='COCOMetric',
metric=['bbox', 'segm'],
ann_file='annotations/instances_val2017.json',
),
# 全景分割指标
dict(
type='CocoPanopticMetric',
ann_file='annotations/panoptic_val2017.json',
seg_prefix='annotations/panoptic_val2017',
)
]
from mmengine.evaluator import Evaluator
from mmengine.fileio import load
# 构建评测器。参数 `metrics` 为评测指标配置
evaluator = Evaluator(metrics=dict(type='Accuracy', top_k=(1, 5)))
# 从文件中读取测试数据。数据格式需要参考具使用的 metric。
data = load('test_data.pkl')
# 从文件中读取模型预测结果。该结果由待评测算法在测试数据集上推理得到。
# 数据格式需要参考具使用的 metric。
predictions = load('prediction.pkl')
# 调用评测器离线评测接口,得到评测结果
# chunk_size 表示每次处理的样本数量,可根据内存大小调整
results = evaluator.offline_evaluate(data, predictions, chunk_size=128)
import torch
from torch.optim import SGD
import torch.nn as nn
import torch.nn.functional as F
inputs = [torch.zeros(10, 1, 1)] * 10
targets = [torch.ones(10, 1, 1)] * 10
model = nn.Linear(1, 1)
optimizer = SGD(model.parameters(), lr=0.01)
optimizer.zero_grad()
for input, target in zip(inputs, targets):
output = model(input)
loss = F.l1_loss(output, target)
loss.backward()
optimizer.step()
optimizer.zero_grad()
from mmengine.optim import OptimWrapper
optim_wrapper = OptimWrapper(optimizer=optimizer)
for input, target in zip(inputs, targets):
output = model(input)
loss = F.l1_loss(output, target)
optim_wrapper.update_params(loss)


from torch.cuda.amp import autocast
model = model.cuda()
inputs = [torch.zeros(10, 1, 1, 1)] * 10
targets = [torch.ones(10, 1, 1, 1)] * 10
for input, target in zip(inputs, targets):
with autocast():
output = model(input.cuda())
loss = F.l1_loss(output, target.cuda())
loss.backward()
optimizer.step()
optimizer.zero_grad()
from mmengine.optim import AmpOptimWrapper
optim_wrapper = AmpOptimWrapper(optimizer=optimizer)
for input, target in zip(inputs, targets):
with optim_wrapper.optim_context(model):
output = model(input.cuda())
loss = F.l1_loss(output, target.cuda())
optim_wrapper.update_params(loss)

for idx, (input, target) in enumerate(zip(inputs, targets)):
with autocast():
output = model(input.cuda())
loss = F.l1_loss(output, target.cuda())
loss.backward()
if idx % 2 == 0:
optimizer.step()
optimizer.zero_grad()
optim_wrapper = AmpOptimWrapper(optimizer=optimizer, accumulative_counts=2)
for input, target in zip(inputs, targets):
with optim_wrapper.optim_context(model):
output = model(input.cuda())
loss = F.l1_loss(output, target.cuda())
optim_wrapper.update_params(loss)

for idx, (input, target) in enumerate(zip(inputs, targets)):
optimizer.zero_grad()
with optim_wrapper.optim_context(model):
output = model(input.cuda())
loss = F.l1_loss(output, target.cuda())
optim_wrapper.backward(loss) # 计算参数梯度
if idx % 2 == 0:
optim_wrapper.step() # 更新参数
optim_wrapper.zero_grad() # 参数的梯度清0
# 基于 torch.nn.utils.clip_grad_norm_ 对梯度进行裁减
optim_wrapper = AmpOptimWrapper(
optimizer=optimizer, clip_grad=dict(max_norm=1))
# 基于 torch.nn.utils.clip_grad_value_ 对梯度进行裁减
optim_wrapper = AmpOptimWrapper(
optimizer=optimizer, clip_grad=dict(clip_value=0.2))
import torch.nn as nn
from torch.optim import SGD
from mmengine.optim import OptimWrapper
model = nn.Linear(1, 1)
optimizer = SGD(model.parameters(), lr=0.01)
optim_wrapper = OptimWrapper(optimizer)
print(optimizer.param_groups[0]['lr']) # -1.01
print(optimizer.param_groups[0]['momentum']) # 0
print(optim_wrapper.get_lr()) # {'lr': [0.01]}
print(optim_wrapper.get_momentum()) # {'momentum': [0]}
# 输出结果
#0.01
#0
#{'lr': [0.01]}
#{'momentum': [0]}
import torch.nn as nn
from torch.optim import SGD
from mmengine.optim import OptimWrapper, AmpOptimWrapper
model = nn.Linear(1, 1)
optimizer = SGD(model.parameters(), lr=0.01)
optim_wapper = OptimWrapper(optimizer=optimizer)
amp_optim_wapper = AmpOptimWrapper(optimizer=optimizer)
# 导出状态字典
optim_state_dict = optim_wapper.state_dict()
amp_optim_state_dict = amp_optim_wapper.state_dict()
print(optim_state_dict)
print(amp_optim_state_dict)
optim_wapper_new = OptimWrapper(optimizer=optimizer)
amp_optim_wapper_new = AmpOptimWrapper(optimizer=optimizer)
# 加载状态字典
amp_optim_wapper_new.load_state_dict(amp_optim_state_dict)
optim_wapper_new.load_state_dict(optim_state_dict)
{'state': {}, 'param_groups': [{'lr': 0.01, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False, 'maximize': False, 'foreach': None, 'params': [0, 1]}]}
{'state': {}, 'param_groups': [{'lr': 0.01, 'momentum': 0, 'dampening': 0, 'weight_decay': 0, 'nesterov': False, 'maximize': False, 'foreach': None, 'params': [0, 1]}], 'loss_scaler': {'scale': 65536.0, 'growth_factor': 2.0, 'backoff_factor': 0.5, 'growth_interval': 2000, '_growth_tracker': 0}}
from torch.optim import SGD
import torch.nn as nn
from mmengine.optim import OptimWrapper, OptimWrapperDict
gen = nn.Linear(1, 1)
disc = nn.Linear(1, 1)
optimizer_gen = SGD(gen.parameters(), lr=0.01)
optimizer_disc = SGD(disc.parameters(), lr=0.01)
optim_wapper_gen = OptimWrapper(optimizer=optimizer_gen)
optim_wapper_disc = OptimWrapper(optimizer=optimizer_disc)
optim_dict = OptimWrapperDict(gen=optim_wapper_gen, disc=optim_wapper_disc)
print(optim_dict.get_lr()) # {'gen.lr': [0.01], 'disc.lr': [0.01]}
print(optim_dict.get_momentum()) # {'gen.momentum': [0], 'disc.momentum': [0]}
{'gen.lr': [0.01], 'disc.lr': [0.01]}
{'gen.momentum': [0], 'disc.momentum': [0]}
optimizer = dict(type='SGD', lr=0.01, momentum=0.9, weight_decay=0.0001)
optim_wrapper = dict(type='OptimWrapper', optimizer=optimizer)
optimizer = dict(type='SGD', lr=0.01, momentum=0.9, weight_decay=0.0001)
optim_wrapper = dict(optimizer=optimizer)
optimizer = dict(type='SGD', lr=0.01, momentum=0.9, weight_decay=0.0001)
optim_wrapper = dict(type='AmpOptimWrapper', optimizer=optimizer, accumulative_counts=2)
import torch
from torch.optim import SGD
from torch.optim.lr_scheduler import ExponentialLR
model = torch.nn.Linear(1, 1)
dataset = [torch.randn((1, 1, 1)) for _ in range(20)]
optimizer = SGD(model, 0.1)
scheduler = ExponentialLR(optimizer, gamma=0.9)
for epoch in range(10):
for data in dataset:
optimizer.zero_grad()
output = model(data)
loss = 1 - output
loss.backward()
optimizer.step()
scheduler.step()
from mmengine.optim.scheduler import MultiStepLR
optimizer = SGD(model.parameters(), lr=0.01, momentum=0.9)
scheduler = MultiStepLR(optimizer, milestones=[8, 11], gamma=0.1)

scheduler = dict(type='MultiStepLR', by_epoch=True, milestones=[8, 11], gamma=0.1)
scheduler = dict(type='MultiStepLR', by_epoch=False, milestones=[600, 800], gamma=0.1)

import numpy as np
from mmcv.transforms import Resize
transform = Resize(scale=(224, 224))
data_dict = {'img': np.random.rand(256, 256, 3)}
data_dict = transform(data_dict)
print(data_dict['img'].shape)
# (224, 224, 3)

test_dataloader = dict(
batch_size=32,
dataset=dict(
type='ImageNet',
data_root='data/imagenet',
pipeline = [
dict(type='LoadImageFromFile'),
dict(type='Resize', size=256, keep_ratio=True),
dict(type='CenterCrop', crop_size=224),
dict(type='PackClsInputs'),
]
)
)
| 数据变换类 | 功能 |
|---|---|
| LoadImageFromFile | 根据路径加载图像 |
| LoadAnnotations | 加载和组织标注信息,如 bbox、语义分割图等 |
| 数据变换类 | 功能 |
|---|---|
| Pad | 填充图像边缘 |
| CenterCrop | 居中裁剪 |
| Normalize | 对图像进行归一化 |
| Resize | 按照指定尺寸或比例缩放图像 |
| RandomResize | 缩放图像至指定范围的随机尺寸 |
| RandomChoiceResize | 缩放图像至多个尺寸中的随机一个尺寸 |
| RandomGrayscale | 随机灰度化 |
| RandomFlip | 图像随机翻转 |
| 数据变换类 | 功能 |
|---|---|
| ToTensor | 将指定的数据转换为 torch.Tensor |
| ImageToTensor | 将图像转换为 torch.Tensor |
import random
import mmcv
from mmcv.transforms import BaseTransform, TRANSFORMS
@TRANSFORMS.register_module()
class MyFlip(BaseTransform):
def __init__(self, direction: str):
super().__init__()
self.direction = direction
def transform(self, results: dict) -> dict:
img = results['img']
results['img'] = mmcv.imflip(img, direction=self.direction)
return results
import numpy as np
transform = MyFlip(direction='horizontal')
data_dict = {'img': np.random.rand(224, 224, 3)}
data_dict = transform(data_dict)
processed_img = data_dict['img']
pipeline = [
...
dict(type='MyFlip', direction='horizontal'),
...
]
{
'metainfo':
{
'classes': ('cat', 'dog'),
...
},
'data_list':
[
{
'img_path': "xxx/xxx_0.jpg",
'img_label': 0,
...
},
{
'img_path': "xxx/xxx_1.jpg",
'img_label': 1,
...
},
...
]
}
data
├── annotations
│ ├── train.json
├── train
│ ├── xxx/xxx_0.jpg
│ ├── xxx/xxx_1.jpg
│ ├── ...

import os.path as osp
from mmengine.dataset import BaseDataset
class ToyDataset(BaseDataset):
# 以上面标注文件为例,在这里 raw_data_info 代表 `data_list` 对应列表里的某个字典:
# {
# 'img_path': "xxx/xxx_0.jpg",
# 'img_label': 0,
# ...
# }
def parse_data_info(self, raw_data_info):
data_info = raw_data_info
img_prefix = self.data_prefix.get('img_path', None)
if img_prefix is not None:
data_info['img_path'] = osp.join(
img_prefix, data_info['img_path'])
return data_info
class LoadImage:
def __call__(self, results):
results['img'] = cv2.imread(results['img_path'])
return results
class ParseImage:
def __call__(self, results):
results['img_shape'] = results['img'].shape
return results
pipeline = [
LoadImage(),
ParseImage(),
]
toy_dataset = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='train/'),
ann_file='annotations/train.json',
pipeline=pipeline)
toy_dataset.metainfo
# dict(classes=('cat', 'dog'))
toy_dataset.get_data_info(0)
# {
# 'img_path': "data/train/xxx/xxx_0.jpg",
# 'img_label': 0,
# ...
# }
len(toy_dataset)
# 2
toy_dataset[0]
# {
# 'img_path': "data/train/xxx/xxx_0.jpg",
# 'img_label': 0,
# 'img': a ndarray with shape (H, W, 3), which denotes the value of the image,
# 'img_shape': (H, W, 3) ,
# ...
# }
# `get_subset` 接口不对原数据集类做修改,即完全复制一份新的
sub_toy_dataset = toy_dataset.get_subset(1)
len(toy_dataset), len(sub_toy_dataset)
# 2, 1
# `get_subset_` 接口会对原数据集类做修改,即 inplace 的方式
toy_dataset.get_subset_(1)
len(toy_dataset)
# 1
经过以上步骤,可以了解基于数据集基类如何自定义新的数据集类,以及如何使用自定义数据集类。
自定义视频的数据集类:在上面的例子中,标注文件的每个原始数据只包含一个训练/测试样本(通常是图像领域)。如果每个原始数据包含若干个训练/测试样本(通常是视频领域),则只需保证 parse_data_info() 的返回值为 list[dict] 即可:
from mmengine.dataset import BaseDataset
class ToyVideoDataset(BaseDataset):
# raw_data_info 仍为一个字典,但它包含了多个样本
def parse_data_info(self, raw_data_info):
data_list = []
...
for ... :
data_info = dict()
...
data_list.append(data_info)
return data_list
pipeline = [
LoadImage(),
ParseImage(),
]
toy_dataset = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='train/'),
ann_file='annotations/train.json',
pipeline=pipeline,
# 在这里传入 lazy_init 变量
lazy_init=True)
# 完整初始化
toy_dataset.full_init()
# 初始化完毕,现在可以访问具体数据
len(toy_dataset)
# 2
toy_dataset[0]
# {
# 'img_path': "data/train/xxx/xxx_0.jpg",
# 'img_label': 0,
# 'img': a ndarray with shape (H, W, 3), which denotes the value the image,
# 'img_shape': (H, W, 3) ,
# ...
# }
数据集基类默认是将 data_list 序列化存入内存,也可以通过 serialize_data 变量(默认为 True)来控制是否提前将 data_list 序列化存入内存中:
pipeline = [
LoadImage(),
ParseImage(),
]
toy_dataset = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='train/'),
ann_file='annotations/train.json',
pipeline=pipeline,
# 在这里传入 serialize_data 变量
serialize_data=False)
from mmengine.dataset import ConcatDataset
pipeline = [
LoadImage(),
ParseImage(),
]
toy_dataset_1 = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='train/'),
ann_file='annotations/train.json',
pipeline=pipeline)
toy_dataset_2 = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='val/'),
ann_file='annotations/val.json',
pipeline=pipeline)
toy_dataset_12 = ConcatDataset(datasets=[toy_dataset_1, toy_dataset_2])
from mmengine.dataset import RepeatDataset
pipeline = [
LoadImage(),
ParseImage(),
]
toy_dataset = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='train/'),
ann_file='annotations/train.json',
pipeline=pipeline)
# 将数据集的 train 部分重复采样了 5 次。
toy_dataset_repeat = RepeatDataset(dataset=toy_dataset, times=5)
from mmengine.dataset import BaseDataset, ClassBalancedDataset
class ToyDataset(BaseDataset):
def parse_data_info(self, raw_data_info):
data_info = raw_data_info
img_prefix = self.data_prefix.get('img_path', None)
if img_prefix is not None:
data_info['img_path'] = osp.join(
img_prefix, data_info['img_path'])
return data_info
# 必须支持的方法,需要返回样本的类别
def get_cat_ids(self, idx):
data_info = self.get_data_info(idx)
return [int(data_info['img_label'])]
pipeline = [
LoadImage(),
ParseImage(),
]
toy_dataset = ToyDataset(
data_root='data/',
data_prefix=dict(img_path='train/'),
ann_file='annotations/train.json',
pipeline=pipeline)
toy_dataset_repeat = ClassBalancedDataset(dataset=toy_dataset, oversample_thr=1e-3)
# 训练单阶段检测器
for img, img_metas, gt_bboxes, gt_labels in data_loader:
loss = retinanet(img, img_metas, gt_bboxes, gt_labels)
# 训练 Mask R-CNN
for img, img_metas, gt_bboxes, gt_masks, gt_labels in data_loader:
loss = mask_rcnn(img, img_metas, gt_bboxes, gt_masks, gt_labels)
for img, data_sample in dataloader:
loss = model(img, data_sample)
import torch
from mmengine.structures import BaseDataElement
# 可以声明一个空的 object
data_element = BaseDataElement()
bboxes = torch.rand((5, 4)) # 假定 bboxes 是一个 Nx4 维的 tensor,N 代表框的个数
scores = torch.rand((5,)) # 假定框的分数是一个 N 维的 tensor,N 代表框的个数
img_id = 0 # 图像的 ID
H = 800 # 图像的高度
W = 1333 # 图像的宽度
# 直接设置 BaseDataElement 的 data 参数
data_element = BaseDataElement(bboxes=bboxes, scores=scores)
# 显式声明来设置 BaseDataElement 的参数 metainfo
data_element = BaseDataElement(
bboxes=bboxes,
scores=scores,
metainfo=dict(img_id=img_id, img_shape=(H, W)))
print(data_element)
"""
"""
data_element = BaseDataElement(
bboxes=torch.rand((5, 4)),
scores=torch.rand((5,)),
metainfo=dict(img_id=1, img_shape=(640, 640)))
# 可以在创建新 `BaseDataElement` 时设置 metainfo 和 data,使得新的 BaseDataElement 有相同未被设置的数据
data_element1 = data_element.new(metainfo=dict(img_id=2, img_shape=(320, 320)))
print('bboxes is in data_element1:', 'bboxes' in data_element1) # True
print('bboxes in data_element1 is same as bbox in data_element', (data_element1.bboxes == data_element.bboxes).all())
print('img_id in data_element1 is', data_element1.img_id == 2) # True
data_element2 = data_element.new(label=torch.rand(5,))
print('bboxes is not in data_element2', 'bboxes' not in data_element2) # True
print('img_id in data_element2 is same as img_id in data_element', data_element2.img_id == data_element.img_id)
print('label in data_element2 is', 'label' in data_element2)
# 也可以通过 `clone` 构建一个新的 object,新的 object 会拥有和 data_element 相同的 data 和 metainfo 内容以及状态。
data_element2 = data_element1.clone()
"""
bboxes is in data_element1: True
bboxes in data_element1 is same as bbox in data_element tensor(True)
img_id in data_element1 is True
bboxes is not in data_element2 True
img_id in data_element2 is same as img_id in data_element True
label in data_element2 is True
"""
data_element = BaseDataElement()
# 通过 `set_metainfo`设置 data_element 的 metainfo 字段,
# 同时 img_id 和 img_shape 成为 data_element 的属性
data_element.set_metainfo(dict(img_id=9, img_shape=(100, 100)))
# 查看 metainfo 的 key, value 和 item
print("metainfo'keys are", data_element.metainfo_keys())
print("metainfo'values are", data_element.metainfo_values())
for k, v in data_element.metainfo_items():
print(f'{k}: {v}')
print("通过类属性查看 img_id 和 img_shape")
print('img_id:', data_element.img_id)
print('img_shape:', data_element.img_shape)
"""
metainfo'keys are ['img_id', 'img_shape']
metainfo'values are [9, (100, 100)]
img_id: 9
img_shape: (100, 100)
通过类属性查看 img_id 和 img_shape
img_id: 9
img_shape: (100, 100)
"""
# 通过类属性直接设置 BaseDataElement 中的 data 字段
data_element.scores = torch.rand((5,))
data_element.bboxes = torch.rand((5, 4))
print("data's key is:", data_element.keys())
print("data's value is:", data_element.values())
for k, v in data_element.items():
print(f'{k}: {v}')
print("通过类属性查看 scores 和 bboxes")
print('scores:', data_element.scores)
print('bboxes:', data_element.bboxes)
print("通过 get() 查看 scores 和 bboxes")
print('scores:', data_element.get('scores', None))
print('bboxes:', data_element.get('bboxes', None))
print('fake:', data_element.get('fake', 'not exist'))
print("All key in data_element is:", data_element.all_keys())
print("The length of values in data_element is", len(data_element.all_values()))
for k, v in data_element.all_items():
print(f'{k}: {v}')
data_element = BaseDataElement(
bboxes=torch.rand((6, 4)), scores=torch.rand((6,)),
metainfo=dict(img_id=0, img_shape=(640, 640))
)
for k, v in data_element.all_items():
print(f'{k}: {v}')
# 对 data 进行修改
data_element.bboxes = data_element.bboxes * 2
data_element.scores = data_element.scores * -1
for k, v in data_element.items():
print(f'{k}: {v}')
# 删除 data 中的属性
del data_element.bboxes
for k, v in data_element.items():
print(f'{k}: {v}')
data_element.pop('scores', None)
print('The keys in data is', data_element.keys())
# 对 metainfo 进行修改
data_element.set_metainfo(dict(img_shape = (1280, 1280), img_id=10))
print(data_element.img_shape) # (1280, 1280)
for k, v in data_element.metainfo_items():
print(f'{k}: {v}')
# 提供了便捷的属性删除和访问操作 pop
del data_element.img_shape
for k, v in data_element.metainfo_items():
print(f'{k}: {v}')
data_element.pop('img_id')
print('The keys in metainfo is', data_element.metainfo_keys())
data_element = BaseDataElement(
bboxes=torch.rand((6, 4)), scores=torch.rand((6,)),
metainfo=dict(img_id=0, img_shape=(640, 640))
)
# 将所有 data 转移到 GPU 上
cuda_element_1 = data_element.cuda()
print('cuda_element_1 is on the device of', cuda_element_1.bboxes.device) # cuda:0
cuda_element_2 = data_element.to('cuda:0')
print('cuda_element_1 is on the device of', cuda_element_2.bboxes.device) # cuda:0
# 将所有 data 转移到 cpu 上
cpu_element_1 = cuda_element_1.cpu()
print('cpu_element_1 is on the device of', cpu_element_1.bboxes.device) # cpu
cpu_element_2 = cuda_element_2.to('cpu')
print('cpu_element_2 is on the device of', cpu_element_2.bboxes.device) # cpu
# 将所有 data 变成 FP16
fp16_instances = cuda_element_1.to(
device=None, dtype=torch.float16, non_blocking=False, copy=False,
memory_format=torch.preserve_format)
print('The type of bboxes in fp16_instances is', fp16_instances.bboxes.dtype) # torch.float16
# 阻断所有 data 的梯度
cuda_element_3 = cuda_element_2.detach()
print('The data in cuda_element_3 requires grad: ', cuda_element_3.bboxes.requires_grad)
# 转移 data 到 numpy array
np_instances = cpu_element_1.numpy()
print('The type of cpu_element_1 is convert to', type(np_instances.bboxes))
"""
>>> print(data_element)
>>>
>>> np_insts = data_element.numpy()
>>> print(np_insts)
"""
img_meta = dict(img_shape=(800, 1196, 3), pad_shape=(800, 1216, 3))
instance_data = BaseDataElement(metainfo=img_meta)
instance_data.det_labels = torch.LongTensor([0, 1, 2, 3])
instance_data.det_scores = torch.Tensor([0.01, 0.1, 0.2, 0.3])
print(instance_data)
from mmengine.structures import InstanceData
import torch
import numpy as np
img_meta = dict(img_shape=(800, 1196, 3), pad_shape=(800, 1216, 3))
instance_data = InstanceData(metainfo=img_meta)
instance_data.det_labels = torch.LongTensor([2, 3])
instance_data.det_scores = torch.Tensor([0.8, 0.7])
instance_data.bboxes = torch.rand((2, 4))
print('The length of instance_data is', len(instance_data)) # 2
instance_data.bboxes = torch.rand((3, 4))
"""
The length of instance_data is 2
AssertionError: the length of values 3 is not consistent with the length of this :obj:`InstanceData` 2
"""
img_meta = dict(img_shape=(800, 1196, 3), pad_shape=(800, 1216, 3))
instance_data = InstanceData(metainfo=img_meta)
instance_data["det_labels"] = torch.LongTensor([2, 3])
instance_data["det_scores"] = torch.Tensor([0.8, 0.7])
instance_data.bboxes = torch.rand((2, 4))
print(instance_data)
img_meta = dict(img_shape=(800, 1196, 3), pad_shape=(800, 1216, 3))
instance_data = InstanceData(metainfo=img_meta)
instance_data.det_labels = torch.LongTensor([2, 3])
instance_data.det_scores = torch.Tensor([0.8, 0.7])
instance_data.bboxes = torch.rand((2, 4))
print(instance_data)
"""
"""
print(instance_data[1])
"""
"""
print(instance_data[0:1])
"""
"""
sorted_results = instance_data[instance_data.det_scores.sort().indices]
print(sorted_results)
"""
"""
filter_results = instance_data[instance_data.det_scores > 0.75]
print(filter_results)
"""
"""
empty_results = instance_data[instance_data.det_scores > 1]
print(empty_results)
"""
"""
img_meta = dict(img_shape=(800, 1196, 3), pad_shape=(800, 1216, 3))
instance_data = InstanceData(metainfo=img_meta)
instance_data.det_labels = torch.LongTensor([2, 3])
instance_data.det_scores = torch.Tensor([0.8, 0.7])
instance_data.bboxes = torch.rand((2, 4))
print('The length of instance_data is', len(instance_data))
cat_results = InstanceData.cat([instance_data, instance_data])
print('The length of instance_data is', len(cat_results))
print(cat_results)
"""
The length of instance_data is 2
The length of instance_data is 4
"""
from mmengine.structures import PixelData
import random
import torch
import numpy as np
metainfo = dict(
img_id=random.randint(0, 100),
img_shape=(random.randint(400, 600), random.randint(400, 600)))
image = np.random.randint(0, 255, (4, 20, 40))
featmap = torch.randint(0, 255, (10, 20, 40))
pixel_data = PixelData(metainfo=metainfo,
image=image,
featmap=featmap)
print('The shape of pixel_data is', pixel_data.shape)
# set
pixel_data.map3 = torch.randint(0, 255, (20, 40))
print('The shape of pixel_data is', pixel_data.map3.shape)
pixel_data.map2 = torch.randint(0, 255, (3, 20, 30))
# AssertionError: the height and width of values (20, 30) is not consistent with the length of this :obj:`PixelData` (20, 40)
pixel_data.map2 = torch.randint(0, 255, (1, 3, 20, 40))
# AssertionError: The dim of value must be 2 or 3, but got 4
metainfo = dict(
img_id=random.randint(0, 100),
img_shape=(random.randint(400, 600), random.randint(400, 600)))
image = np.random.randint(0, 255, (4, 20, 40))
featmap = torch.randint(0, 255, (10, 20, 40))
pixel_data = PixelData(metainfo=metainfo,
image=image,
featmap=featmap)
print('The shape of pixel_data is', pixel_data.shape)
# The shape of pixel_data is (20, 40)
index_data = pixel_data[10, 20]
print('The shape of index_data is', index_data.shape)
# The shape of index_data is (1, 1)
slice_data = pixel_data[10:20, 20:40]
print('The shape of slice_data is', slice_data.shape)
# The shape of slice_data is (10, 20)
from mmengine.structures import LabelData
import torch
item = torch.tensor([1], dtype=torch.int64)
num_classes = 10
onehot = LabelData.label_to_onehot(label=item, num_classes=num_classes)
print(f'{num_classes} is convert to ', onehot)
index = LabelData.onehot_to_label(onehot=onehot)
print(f'{onehot} is convert to ', index)
"""
10 is convert to tensor([0, 1, 0, 0, 0, 0, 0, 0, 0, 0])
tensor([0, 1, 0, 0, 0, 0, 0, 0, 0, 0]) is convert to tensor([1])
"""
from mmdet.structures import DetDataSample
data_sample = DetDataSample()
data_sample.proposals = InstanceData(data=dict(bboxes=torch.rand((5,4))))
print(data_sample)
"""
) at 0x7f9f1c090430>
"""
class SingleStageDetector(BaseDetector):
...
def forward_train(self,
img,
img_metas,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None):
class SingleStageInstanceSegmentor(BaseDetector):
...
def forward_train(self,
img,
img_metas,
gt_masks,
gt_labels,
gt_bboxes=None,
gt_bboxes_ignore=None,
**kwargs):
class SingleStageDetector(BaseDetector):
...
def forward_train(self,
img,
data_samples):
class SingleStageInstanceSegmentor(BaseDetector):
...
def forward_train(self,
img,
data_samples):
class HungarianAssigner(BaseAssigner):
def assign(self,
bbox_pred,
cls_pred,
gt_bboxes,
gt_labels,
img_meta,
gt_bboxes_ignore=None,
eps=1e-7):
class MaskHungarianAssigner(BaseAssigner):
def assign(self,
cls_pred,
mask_pred,
gt_labels,
gt_mask,
img_meta,
gt_bboxes_ignore=None,
eps=1e-7):
class HungarianAssigner(BaseAssigner):
def assign(self,
pred_instances,
gt_instancess,
gt_instances_ignore=None,
eps=1e-7):
import torch
import mmcv
from mmengine.visualization import Visualizer
image = mmcv.imread('docs/en/_static/image/cat_and_dog.png', channel_order='rgb')
visualizer = Visualizer(image=image)
# 绘制单个检测框, xyxy 格式
visualizer.draw_bboxes(torch.tensor([72, 13, 179, 147]))
# 绘制多个检测框
visualizer.draw_bboxes(torch.tensor([[33, 120, 209, 220], [72, 13, 179, 147]]))
visualizer.show()
visualizer.set_image(image=image)
visualizer.draw_texts("cat and dog", torch.tensor([10, 20]))
visualizer.show()
visualizer.set_image(image=image)
visualizer.draw_bboxes(torch.tensor([72, 13, 179, 147]), edge_colors='r', line_widths=3)
visualizer.draw_bboxes(torch.tensor([[33, 120, 209, 220]]),line_styles='--')
visualizer.show()
上述绘制接口可以多次调用,从而实现叠加显示需求
visualizer.set_image(image=image)
visualizer.draw_bboxes(torch.tensor([[33, 120, 209, 220], [72, 13, 179, 147]]))
visualizer.draw_texts("cat and dog",
torch.tensor([10, 20])).draw_circles(torch.tensor([40, 50]), torch.tensor([20]))
visualizer.show()

@staticmethod
def draw_featmap(featmap: torch.Tensor, # 输入格式要求为 CHW
overlaid_image: Optional[np.ndarray] = None, # 如果同时输入了 image 数据,则特征图会叠加到 image 上绘制
channel_reduction: Optional[str] = 'squeeze_mean', # 多个通道压缩为单通道的策略
topk: int = 10, # 可选择激活度最高的 topk 个特征图显示
arrangement: Tuple[int, int] = (5, 2), # 多通道展开为多张图时候布局
resize_shape:Optional[tuple] = None, # 可以指定 resize_shape 参数来缩放特征图
alpha: float = 0.5) -> np.ndarray: # 图片和特征图绘制的叠加比例
import torch
import mmcv
import numpy as np
from mmengine.visualization import Visualizer
from torchvision.models import resnet18, ResNet18_Weights
from torchvision.transforms import Compose, Normalize, ToTensor
image = mmcv.imread('docs/en/_static/image/cat_and_dog.png', channel_order='rgb')
#visualizer = Visualizer(image=image)
visualizer = Visualizer(image=image, vis_backends=[dict(type='LocalVisBackend')], save_dir='temp_dir')
visualizer.show()
def preprocess_image(img, mean, std):
preprocessing = Compose([
ToTensor(),
Normalize(mean=mean, std=std)
])
return preprocessing(img.copy()).unsqueeze(0)
model = resnet18(weights=ResNet18_Weights.DEFAULT)
def _forward(x):
x = model.conv1(x)
x = model.bn1(x)
x = model.relu(x)
x = model.maxpool(x)
x1 = model.layer1(x)
x2 = model.layer2(x1)
x3 = model.layer3(x2)
x4 = model.layer4(x3)
return x4
model.forward = _forward
image_norm = np.float32(image) / 255
input_tensor = preprocess_image(image_norm,
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
feat = model(input_tensor)[0]
print(feat.size()) # torch.Size([512, 7, 7])
visualizer = Visualizer()
drawn_img = visualizer.draw_featmap(feat, channel_reduction='select_max')
visualizer.show(drawn_img)

drawn_img = visualizer.draw_featmap(feat, image, channel_reduction='select_max')
visualizer.show(drawn_img)

drawn_img = visualizer.draw_featmap(feat, image, channel_reduction=None, topk=5, arrangement=(2, 3))
visualizer.show(drawn_img)
drawn_img = visualizer.draw_featmap(feat, image, channel_reduction=None, topk=5, arrangement=(4, 2))
visualizer.show(drawn_img)

import torch
import mmcv
import numpy as np
from mmengine.visualization import Visualizer
from torchvision.models import resnet18, ResNet18_Weights
from torchvision.transforms import Compose, Normalize, ToTensor
image = mmcv.imread('docs/en/_static/image/cat_and_dog.png', channel_order='rgb')
visualizer = Visualizer(image=image, vis_backends=[dict(type='LocalVisBackend')], save_dir='temp_dir')
visualizer.draw_bboxes(torch.tensor([[33, 120, 209, 220], [72, 13, 179, 147]]))
visualizer.draw_texts("cat and dog", torch.tensor([10, 20]))
visualizer.draw_circles(torch.tensor([40, 50]), torch.tensor([20]))
# 会生成 temp_dir/vis_data/vis_image/demo_0.png
visualizer.add_image('demo', visualizer.get_image())
# 会生成 temp_dir/vis_data/vis_image/demo_1.png
visualizer.add_image('demo', visualizer.get_image(), step=1)
# 会生成 temp_dir/vis_data/vis_image/demo_3.png
visualizer.add_image('demo', visualizer.get_image(), step=3)
# TensorboardVisBackend
visualizer = Visualizer(image=image, vis_backends=[dict(type='TensorboardVisBackend')], save_dir='temp_dir')
# 或者 WandbVisBackend
visualizer = Visualizer(image=image, vis_backends=[dict(type='WandbVisBackend')], save_dir='temp_dir')
visualizer = Visualizer(vis_backends=[dict(type='LocalVisBackend')], save_dir='temp_dir')
drawn_img = visualizer.draw_featmap(feat, image, channel_reduction=None, topk=5, arrangement=(2, 3))
# 会生成 temp_dir/vis_data/vis_image/feat_0.png
visualizer.add_image('feat', drawn_img)
# 会生成 temp_dir/vis_data/scalars.json
# 保存 loss
visualizer.add_scalar('loss', 0.2, step=0)
visualizer.add_scalar('loss', 0.1, step=1)
# 保存 acc
visualizer.add_scalar('acc', 0.7, step=0)
visualizer.add_scalar('acc', 0.8, step=1)
# 会将内容追加到 temp_dir/vis_data/scalars.json
visualizer.add_scalars({'loss': 0.3, 'acc': 0.8}, step=3)
from mmengine import Config
cfg=Config.fromfile('tests/data/config/py_config/config.py')
# 会生成 temp_dir/vis_data/config.py
visualizer.add_config(cfg)
visualizer = Visualizer(image=image, vis_backends=[dict(type='TensorboardVisBackend'),
dict(type='LocalVisBackend')],
save_dir='temp_dir')
# 会生成 temp_dir/vis_data/events.out.tfevents.xxx 文件
visualizer.draw_bboxes(torch.tensor([[33, 120, 209, 220], [72, 13, 179, 147]]))
visualizer.draw_texts("cat and dog", torch.tensor([10, 20]))
visualizer.draw_circles(torch.tensor([40, 50]), torch.tensor([20]))
visualizer.add_image('demo', visualizer.get_image())
visualizer = Visualizer(image=image, vis_backends=[dict(type='TensorboardVisBackend', name='tb_1', save_dir='temp_dir_1'),
dict(type='TensorboardVisBackend', name='tb_2', save_dir='temp_dir_2'),
dict(type='LocalVisBackend', name='local')],
save_dir='temp_dir')
# 在程序初始化时候调用
visualizer1 = Visualizer.get_instance(name='vis', vis_backends=[dict(type='LocalVisBackend')])
# 在任何代码位置都可调用
visualizer2 = Visualizer.get_current_instance()
visualizer2.add_scalar('map', 0.7, step=0)
assert id(visualizer1) == id(visualizer2)
from mmengine.registry import VISUALIZERS
visualizer_cfg=dict(
type='Visualizer',
name='vis_new',
vis_backends=[dict(type='LocalVisBackend')])
VISUALIZERS.build(visualizer_cfg)
import torch
import mmcv
import numpy as np
from mmengine.visualization import Visualizer
image = mmcv.imread('docs/en/_static/image/cat_and_dog.png', channel_order='rgb')
visualizer = Visualizer(image=image, vis_backends=[dict(type='WandbVisBackend')],
save_dir='temp_dir')
# 获取 wandb 对象
wandb = visualizer.get_backend('WandbVisBackend').experiment
# 追加表格数据
table = wandb.Table(columns=["step", "mAP"])
table.add_data(1, 0.2)
table.add_data(2, 0.5)
table.add_data(3, 0.9)
# 保存
wandb.log({"table": table})
from mmengine.registry import VISBACKENDS
from mmengine.visualization import BaseVisBackend
@VISBACKENDS.register_module()
class DemoVisBackend(BaseVisBackend):
def add_image(self, **kwargs):
pass
visualizer = Visualizer(vis_backends=[dict(type='DemoVisBackend')], save_dir='temp_dir')
visualizer.add_image('demo',image)
from mmengine.registry import VISUALIZERS
@VISUALIZERS.register_module()
class DetLocalVisualizer(Visualizer):
def add_datasample(self,
name,
image: np.ndarray,
data_sample: Optional['BaseDataElement'] = None,
draw_gt: bool = True,
draw_pred: bool = True,
show: bool = False,
wait_time: int = 0,
step: int = 0) -> None:
pass
visualizer_cfg = dict(
type='DetLocalVisualizer', vis_backends=[dict(type='WandbVisBackend')], name='visualizer')
# 全局初始化
VISUALIZERS.build(visualizer_cfg)
# 任意代码位置
det_local_visualizer = Visualizer.get_current_instance()
det_local_visualizer.add_datasample('det', image, data_sample)
log_processor = dict(window_size=10, by_epoch=True, custom_cfg=None, num_digits=4)
runner = Runner(
model=ResNet18(),
work_dir='./work_dir',
train_dataloader=train_dataloader_cfg,
optim_wrapper=dict(optimizer=dict(type='SGD', lr=0.001, momentum=0.9)),
train_cfg=dict(by_epoch=True, max_epochs=3),
resume=True,
)
runner.train()
runner = Runner(
model=ResNet18(),
work_dir='./work_dir',
train_dataloader=train_dataloader_cfg,
optim_wrapper=dict(optimizer=dict(type='SGD', lr=0.001, momentum=0.9)),
train_cfg=dict(by_epoch=True, max_epochs=3),
load_from='./work_dir/epoch_2.pth',
resume=True,
)
runner.train()
python -m torch.distributed.launch --nproc_per_node=8 examples/train.py --launcher pytorch
CUDA_VISIBLE_DEVICES=0,3 python -m torch.distributed.launch --nproc_per_node=2 examples/train.py --launcher pytorch
python -m torch.distributed.launch \
--nnodes 8 \
--node_rank 0 \
--master_addr 127.0.0.1 \
--master_port 29500 \
--nproc_per_node=8 \
examples/train.py --launcher pytorch
python -m torch.distributed.launch \
--nnodes 8 \
--node_rank 1 \
--master_addr 127.0.0.1 \
--master_port 29500 \
--nproc_per_node=8 \
examples/train.py --launcher pytorch
srun -p mm_dev \
--job-name=test \
--gres=gpu:8 \
--ntasks=16 \
--ntasks-per-node=8 \
--cpus-per-task=5 \
--kill-on-bad-exit=1 \
python examples/train.py --launcher="slurm"
runner = Runner(
model=ResNet18(),
work_dir='./work_dir',
train_dataloader=train_dataloader_cfg,
optim_wrapper=dict(type='AmpOptimWrapper', optimizer=dict(type='SGD', lr=0.001, momentum=0.9)),
train_cfg=dict(by_epoch=True, max_epochs=3),
)
runner.train()
optim_wrapper_cfg = dict(
type='OptimWrapper',
optimizer=dict(type='SGD', lr=0.001, momentum=0.9),
# 累加 4 次参数更新一次
accumulative_counts=4)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from mmengine.runner import Runner
from mmengine.model import BaseModel
train_dataset = [(torch.ones(1, 1), torch.ones(1, 1))] * 50
train_dataloader = DataLoader(train_dataset, batch_size=2)
class ToyModel(BaseModel):
def __init__(self) -> None:
super().__init__()
self.linear = nn.Linear(1, 1)
def forward(self, img, label, mode):
feat = self.linear(img)
loss1 = (feat - label).pow(2)
loss2 = (feat - label).abs()
return dict(loss1=loss1, loss2=loss2)
runner = Runner(
model=ToyModel(),
work_dir='tmp_dir',
train_dataloader=train_dataloader,
train_cfg=dict(by_epoch=True, max_epochs=1),
optim_wrapper=dict(optimizer=dict(type='SGD', lr=0.01),
accumulative_counts=4)
)
runner.train()
# 位于 cfg 配置文件中
model_wrapper_cfg=dict(type='MMFullyShardedDataParallel', cpu_offload=True)
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from mmengine.runner import Runner
from mmengine.model import BaseModel
train_dataset = [(torch.ones(1, 1), torch.ones(1, 1))] * 50
train_dataloader = DataLoader(train_dataset, batch_size=2)
class ToyModel(BaseModel):
def __init__(self) -> None:
super().__init__()
self.linear = nn.Linear(1, 1)
def forward(self, img, label, mode):
feat = self.linear(img)
loss1 = (feat - label).pow(2)
loss2 = (feat - label).abs()
return dict(loss1=loss1, loss2=loss2)
runner = Runner(
model=ToyModel(),
work_dir='tmp_dir',
train_dataloader=train_dataloader,
train_cfg=dict(by_epoch=True, max_epochs=1),
optim_wrapper=dict(optimizer=dict(type='SGD', lr=0.01)),
cfg=dict(model_wrapper_cfg=dict(type='MMFullyShardedDataParallel', cpu_offload=True))
)
runner.train()
# 使用 custom_imports 将 mmcls 的 models 添加进注册器
custom_imports = dict(imports=['mmcls.models'], allow_failed_imports=False)
model = dict(
type='MaskRCNN',
data_preprocessor=dict(...),
backbone=dict(
type='mmcls.ConvNeXt', # 添加 mmcls 前缀完成跨库调用
arch='tiny',
out_indices=[0, 1, 2, 3],
drop_path_rate=0.4,
layer_scale_init_value=1.0,
gap_before_final_norm=False,
init_cfg=dict(
type='Pretrained',
checkpoint=
'https://download.openmmlab.com/mmclassification/v0/convnext/downstream/convnext-tiny_3rdparty_32xb128-noema_in1k_20220301-795e9634.pth',
prefix='backbone.')),
neck=dict(...),
rpn_head=dict(...))
# 使用 custom_imports 将 mmdet 的 transforms 添加进注册器
custom_imports = dict(imports=['mmdet.datasets.transforms'], allow_failed_imports=False)
# 添加 mmdet 前缀完成跨库调用
train_pipeline=[
dict(type='mmdet.LoadImageFromFile'),
dict(type='mmdet.LoadAnnotations', with_bbox=True, box_type='qbox'),
dict(type='ConvertBoxType', box_type_mapping=dict(gt_bboxes='rbox')),
dict(type='mmdet.Resize', scale=(1024, 2014), keep_ratio=True),
dict(type='mmdet.RandomFlip', prob=0.5),
dict(type='mmdet.PackDetInputs')
]
# 使用 custom_imports 将 mmdet 的 models 添加进注册器
custom_imports = dict(imports=['mmdet.models'], allow_failed_imports=False)
model = dict(
type='mmdet.YOLOX',
backbone=dict(type='mmdet.CSPDarknet', deepen_factor=1.33, widen_factor=1.25),
neck=dict(
type='mmdet.YOLOXPAFPN',
in_channels=[320, 640, 1280],
out_channels=320,
num_csp_blocks=4),
bbox_head=dict(
type='mmdet.YOLOXHead', num_classes=1, in_channels=320, feat_channels=320),
train_cfg=dict(assigner=dict(type='mmdet.SimOTAAssigner', center_radius=2.5)))
# 使用 custom_imports 将 mmdet 的 models 添加进注册器
custom_imports = dict(imports=['mmdet.models'], allow_failed_imports=False)
model = dict(
_scope_='mmdet', # 使用 _scope_ 关键字,避免给所有子模块添加前缀
type='YOLOX',
backbone=dict(type='CSPDarknet', deepen_factor=1.33, widen_factor=1.25),
neck=dict(
type='YOLOXPAFPN',
in_channels=[320, 640, 1280],
out_channels=320,
num_csp_blocks=4),
bbox_head=dict(
type='YOLOXHead', num_classes=1, in_channels=320, feat_channels=320),
train_cfg=dict(assigner=dict(type='SimOTAAssigner', center_radius=2.5)))