LMM / mogen /datasets /motionverse_dataset.py
mingyuan's picture
initial commit
373af33
raw
history blame
32.9 kB
import copy
import os
import pickle as pkl
from typing import Optional, Union, List
import numpy as np
import torch
import torch.nn as nn
import json
from torch.utils.data import ConcatDataset, Dataset, WeightedRandomSampler
from .builder import DATASETS
from .pipelines import Compose, RetargetSkeleton
import random
import pytorch3d.transforms as geometry
from scipy.ndimage import gaussian_filter
# from mogen.core.evaluation import build_evaluator
# from mogen.core.evaluation.utils import compute_similarity_transform, transform_pose_sequence
from mogen.models.builder import build_submodule
from .utils import copy_repr_data, extract_repr_data, move_repr_data, recover_from_ric
class SingleMotionVerseDataset(Dataset):
"""
A dataset class for handling single MotionVerse datasets.
Args:
dataset_name (str): Name of the dataset and task to load.
data_prefix (str): Path to the directory containing the dataset.
ann_file (str): Path to the annotation file.
pipeline (list): A list of transformations to apply on the data.
mode (str): the mode of current work. Choices: ['pretrain', 'train', 'test'].
eval_cfg (dict): Configuration for evaluation metrics.
"""
def __init__(self,
dataset_path: Optional[str] = None,
task_name: Optional[str] = None,
data_prefix: Optional[str] = None,
ann_file: Optional[str] = None,
pipeline: Optional[List[dict]] = None,
# for text2motion and speech2gesture
tgt_min_motion_length: int = 20,
tgt_max_motion_length: int = 200,
# for video2motion
v2m_window_size: int = 20,
# for motion prediction
mp_input_length: int = 50,
mp_output_length: int = 25,
mp_stride_step: int = 5,
# for general test
test_rotation_type: str = 'h3d_rot',
target_framerate: float = 20,
eval_cfg: Optional[dict] = None,
test_mode: Optional[bool] = False):
data_prefix = os.path.join(data_prefix, 'datasets', dataset_path)
self.dataset_path = dataset_path
assert task_name in ['mocap', 't2m', 'v2m', 's2g', 'm2d']
self.task_name = task_name
self.dataset_name = dataset_path + '_' + task_name
# define subdirectories
self.meta_dir = os.path.join(data_prefix, 'metas')
self.motion_dir = os.path.join(data_prefix, 'motions')
self.eval_motion_dir = os.path.join(data_prefix, 'eval_motions')
self.text_dir = os.path.join(data_prefix, 'texts')
self.text_feat_dir = os.path.join(data_prefix, 'text_feats')
self.speech_dir = os.path.join(data_prefix, 'speeches')
self.speech_feat_dir = os.path.join(data_prefix, 'speech_feats')
self.music_dir = os.path.join(data_prefix, 'musics')
self.music_feat_dir = os.path.join(data_prefix, 'music_feats')
self.video_feat_dir = os.path.join(data_prefix, 'video_feats')
self.anno_file = os.path.join(data_prefix, 'splits', ann_file)
self.pipeline = Compose(pipeline)
self.tgt_min_motion_length = tgt_min_motion_length
self.tgt_max_motion_length = tgt_max_motion_length
self.v2m_window_size = v2m_window_size
self.mp_input_length = mp_input_length
self.mp_output_length = mp_output_length
self.mp_stride_step = mp_stride_step
self.target_framerate = target_framerate
self.test_rotation_type = test_rotation_type
self.test_mode = test_mode
self.load_annotations()
self.eval_cfg = copy.deepcopy(eval_cfg)
if self.test_mode:
self.prepare_evaluation()
def __len__(self) -> int:
"""Return the length of the current dataset."""
if self.test_mode:
return len(self.eval_indexes)
return len(self.name_list)
def __getitem__(self, idx: int) -> dict:
"""Prepare data for the given index."""
if self.test_mode:
idx = self.eval_indexes[idx]
return self.prepare_data(idx)
def load_annotations(self):
if self.task_name == 'mocap':
self.load_annotations_mocap()
elif self.task_name == 't2m':
self.load_annotations_t2m()
elif self.task_name == 'v2m':
self.load_annotations_v2m()
elif self.task_name == 's2g':
self.load_annotations_s2g()
elif self.task_name == 'm2d':
self.load_annotations_m2d()
else:
raise NotImplementedError()
def load_annotations_mocap(self):
if self.test_mode:
self.name_list = []
self.src_start_frame = []
self.src_end_frame = []
self.tgt_start_frame = []
self.tgt_end_frame = []
tgt_motion_length = self.mp_input_length + self.mp_output_length
for name in open(self.anno_file):
name = name.strip()
meta_path = os.path.join(self.meta_dir, name + ".json")
meta_data = json.load(open(meta_path))
num_frames = meta_data['num_frames']
downrate = int(meta_data['framerate'] / self.target_framerate + 0.1)
if num_frames < (self.mp_input_length + self.mp_output_length) * downrate:
continue
lim = num_frames // downrate - tgt_motion_length
for start_frame in range(0, lim, self.mp_stride_step):
self.name_list.append(name)
self.src_start_frame.append((start_frame + 1) * downrate)
self.src_end_frame.append((start_frame + tgt_motion_length + 1) * downrate)
self.tgt_start_frame.append(start_frame + self.mp_input_length)
self.tgt_end_frame.append(start_frame + tgt_motion_length)
else:
self.name_list = []
for name in open(self.anno_file):
name = name.strip()
self.name_list.append(name)
def load_annotations_t2m(self):
self.name_list = []
self.text_idx = []
for name in open(self.anno_file):
name = name.strip()
meta_path = os.path.join(self.meta_dir, name + ".json")
meta_data = json.load(open(meta_path))
downrate = int(meta_data['framerate'] / self.target_framerate + 0.1)
text_path = os.path.join(self.text_dir, name + ".json")
text_data = json.load(open(text_path))
for i, anno in enumerate(text_data):
start_frame = anno['start_frame'] // downrate
end_frame = min(anno['end_frame'], meta_data['num_frames']) // downrate
num_frame = end_frame - start_frame
if num_frame < self.tgt_min_motion_length or num_frame > self.tgt_max_motion_length:
continue
if len(anno['body_text']) > 0:
self.name_list.append(name)
self.text_idx.append(i)
def load_annotations_v2m(self):
if not self.test_mode:
self.name_list = []
for name in open(self.anno_file):
name = name.strip()
self.name_list.append(name)
else:
self.name_list = []
self.start_frame = []
self.end_frame = []
self.valid_start_frame = []
self.valid_end_frame = []
for name in open(self.anno_file):
name = name.strip()
meta_path = os.path.join(self.meta_dir, name + ".json")
meta_data = json.load(open(meta_path))
num_frames = meta_data['num_frames']
assert num_frames >= self.v2m_window_size
cur_idx = 0
while cur_idx < num_frames:
if cur_idx + self.v2m_window_size < num_frames:
self.name_list.append(name)
self.start_frame.append(cur_idx)
self.end_frame.append(cur_idx + self.v2m_window_size)
self.valid_start_frame.append(cur_idx)
self.valid_end_frame.append(cur_idx + self.v2m_window_size)
cur_idx += self.v2m_window_size
else:
self.name_list.append(name)
self.start_frame.append(num_frames - self.v2m_window_size)
self.end_frame.append(num_frames)
self.valid_start_frame.append(cur_idx)
self.valid_end_frame.append(num_frames)
break
def load_annotations_s2g(self):
self.name_list = []
self.speech_idx = []
for name in open(self.anno_file):
name = name.strip()
meta_path = os.path.join(self.meta_dir, name + ".json")
meta_data = json.load(open(meta_path))
downrate = int(meta_data['framerate'] / self.target_framerate + 0.1)
speech_path = os.path.join(self.speech_dir, name + ".json")
speech_data = json.load(open(speech_path))
for i, anno in enumerate(speech_data):
start_frame = anno['start_frame'] // downrate
end_frame = min(anno['end_frame'], meta_data['num_frames']) // downrate
num_frame = end_frame - start_frame
if num_frame < self.tgt_min_motion_length or num_frame > self.tgt_max_motion_length:
continue
self.name_list.append(name)
self.speech_idx.append(i)
def load_annotations_m2d(self):
self.name_list = []
self.music_idx = []
for name in open(self.anno_file):
name = name.strip()
meta_path = os.path.join(self.meta_dir, name + ".json")
meta_data = json.load(open(meta_path))
downrate = int(meta_data['framerate'] / self.target_framerate + 0.1)
music_path = os.path.join(self.music_dir, name + ".json")
music_data = json.load(open(music_path))
for i, anno in enumerate(music_data):
start_frame = anno['start_frame'] // downrate
end_frame = min(anno['end_frame'], meta_data['num_frames']) // downrate
num_frame = end_frame - start_frame
if num_frame < self.tgt_min_motion_length or num_frame > self.tgt_max_motion_length:
continue
self.name_list.append(name)
self.music_idx.append(i)
def prepare_data_base(self, idx: int) -> dict:
results = {}
name = self.name_list[idx]
results['motion_path'] = os.path.join(self.motion_dir, name + ".npz")
meta_path = os.path.join(self.meta_dir, name + ".json")
meta_data = json.load(open(meta_path))
meta_data['dataset_name'] = self.dataset_name
results['meta_data'] = meta_data
results['meta_data']['sample_idx'] = idx
results.update({
'text_word_feat': np.zeros((77, 1024)).astype(np.float32),
'text_seq_feat': np.zeros((1024)).astype(np.float32),
'text_cond': 0,
'music_word_feat': np.zeros((229, 768)).astype(np.float32),
'music_seq_feat': np.zeros((1024)).astype(np.float32),
'music_cond': 0,
'speech_word_feat': np.zeros((229, 768)).astype(np.float32),
'speech_seq_feat': np.zeros((1024)).astype(np.float32),
'speech_cond': 0,
'video_seq_feat': np.zeros((1024)).astype(np.float32),
'video_cond': 0,
})
return results
def prepare_data(self, idx: int) -> dict:
if self.task_name == 'mocap':
results = self.prepare_data_mocap(idx)
elif self.task_name == 't2m':
results = self.prepare_data_t2m(idx)
elif self.task_name == 'v2m':
results = self.prepare_data_v2m(idx)
elif self.task_name == 's2g':
results = self.prepare_data_s2g(idx)
elif self.task_name == 'm2d':
results = self.prepare_data_m2d(idx)
else:
raise NotImplementedError()
results = self.pipeline(results)
return results
def prepare_data_mocap(self, idx: int) -> dict:
results = self.prepare_data_base(idx)
if self.test_mode:
results['meta_data']['start_frame'] = self.src_start_frame[idx]
results['meta_data']['end_frame'] = self.src_end_frame[idx]
results['context_mask'] = np.concatenate(
(np.ones((self.mp_input_length - 1)), np.zeros((self.mp_output_length))),
axis=-1
)
return results
def prepare_data_t2m(self, idx: int) -> dict:
results = self.prepare_data_base(idx)
name = self.name_list[idx]
text_idx = self.text_idx[idx]
text_path = os.path.join(self.text_dir, name + ".json")
text_data = json.load(open(text_path))[text_idx]
text_feat_path = os.path.join(self.text_feat_dir, name + ".pkl")
text_feat_data = pkl.load(open(text_feat_path, "rb"))
text_list = text_data['body_text']
tid = np.random.randint(len(text_list))
text = text_list[tid]
text_word_feat = text_feat_data['text_word_feats'][text_idx][tid]
text_seq_feat = text_feat_data['text_seq_feats'][text_idx][tid]
assert text_word_feat.shape[0] == 77
assert text_word_feat.shape[1] == 1024
assert text_seq_feat.shape[0] == 1024
if self.test_mode:
motion_path = os.path.join(self.eval_motion_dir, name + ".npy")
motion_data = np.load(motion_path)
assert not np.isnan(motion_data).any()
downrate = int(results['meta_data']['framerate'] / self.target_framerate + 0.1)
start_frame = text_data['start_frame'] // downrate
end_frame = text_data['end_frame'] // downrate
motion_data = motion_data[start_frame: end_frame]
results['meta_data']['framerate'] = self.target_framerate
results['meta_data']['rotation_type'] = self.test_rotation_type
assert motion_data.shape[0] > 0
if 'body_tokens' in text_data:
token = text_data['body_tokens'][tid]
else:
token = ""
text_cond = 1
results.update({
'motion': motion_data,
'text_word_feat': text_word_feat,
'text_seq_feat': text_seq_feat,
'text_cond': text_cond,
'text': text,
'token': token
})
else:
results['meta_data']['start_frame'] = text_data['start_frame']
results['meta_data']['end_frame'] = text_data['end_frame']
text_cond = 1
results.update({
'text_word_feat': text_word_feat,
'text_seq_feat': text_seq_feat,
'text_cond': text_cond
})
return results
def prepare_data_v2m(self, idx: int) -> dict:
results = self.prepare_data_base(idx)
name = self.name_list[idx]
video_feat_path = os.path.join(self.video_feat_dir, name + ".pkl")
video_feat_data = pkl.load(open(video_feat_path, "rb"))
video_word_feat = video_feat_data['video_word_feats']
video_seq_feat = video_feat_data['video_seq_feats']
assert video_word_feat.shape[0] == results['meta_data']['num_frames']
assert video_word_feat.shape[1] == 1024
assert video_seq_feat.shape[0] == 1024
video_cond = 1
if self.test_mode:
results['meta_data']['start_frame'] = self.start_frame[idx]
results['meta_data']['end_frame'] = self.end_frame[idx]
motion_path = os.path.join(self.eval_motion_dir, name + ".npy")
motion_data = np.load(motion_path)
assert not np.isnan(motion_data).any()
start_frame = self.start_frame[idx]
end_frame = self.end_frame[idx]
motion_data = motion_data[start_frame: end_frame]
video_word_feat = video_word_feat[start_frame: end_frame]
results['meta_data']['framerate'] = self.target_framerate
results['meta_data']['rotation_type'] = self.test_rotation_type
assert motion_data.shape[0] > 0
results.update({
'motion': motion_data,
'video_word_feat': video_word_feat,
'video_seq_feat': video_seq_feat,
'video_cond': video_cond
})
else:
results.update({
'video_word_feat': video_word_feat,
'video_seq_feat': video_seq_feat,
'video_cond': video_cond
})
return results
def prepare_data_s2g(self, idx: int) -> dict:
results = self.prepare_data_base(idx)
name = self.name_list[idx]
speech_idx = self.speech_idx[idx]
speech_path = os.path.join(self.speech_dir, name + ".json")
speech_data = json.load(open(speech_path))[speech_idx]
speech_feat_path = os.path.join(self.speech_feat_dir, name + ".pkl")
speech_feat_data = pkl.load(open(speech_feat_path, "rb"))
try:
speech_word_feat = speech_feat_data['audio_word_feats'][speech_idx]
speech_seq_feat = speech_feat_data['audio_seq_feats'][speech_idx]
except:
speech_word_feat = speech_feat_data['speech_word_feats'][speech_idx]
speech_seq_feat = speech_feat_data['speech_seq_feats'][speech_idx]
del speech_feat_data
assert speech_word_feat.shape[0] == 229
assert speech_word_feat.shape[1] == 768
assert speech_seq_feat.shape[0] == 1024
results['meta_data']['start_frame'] = speech_data['start_frame']
results['meta_data']['end_frame'] = speech_data['end_frame']
speech_cond = 1
results.update({
'speech_word_feat': speech_word_feat,
'speech_seq_feat': speech_seq_feat,
'speech_cond': speech_cond
})
if self.test_mode:
results['meta_data']['framerate'] = self.target_framerate
results['meta_data']['rotation_type'] = self.test_rotation_type
eval_data_path = os.path.join(self.eval_motion_dir, name + ".npz")
eval_data = np.load(eval_data_path)
motion_data = eval_data["bvh_rot_beat141"]
sem_data = eval_data["sem"]
wav_data = eval_data["wave16k"]
assert not np.isnan(motion_data).any()
start_frame = results['meta_data']['start_frame']
end_frame = results['meta_data']['end_frame']
wav_start_frame = start_frame / results['meta_data']['framerate'] * 16000
wav_end_frame = end_frame / results['meta_data']['framerate'] * 16000
motion_data = motion_data[start_frame: end_frame]
sem_data = sem_data[start_frame: end_frame]
wav_data = wav_data[wav_start_frame: wav_end_frame]
assert motion_data.shape[0] > 0
results.update({
'motion': motion_data,
'sem_score': sem_data,
'wav_feat': wav_data
})
return results
def prepare_data_m2d(self, idx: int) -> dict:
results = self.prepare_data_base(idx)
name = self.name_list[idx]
music_idx = self.music_idx[idx]
music_path = os.path.join(self.music_dir, name + ".json")
music_data = json.load(open(music_path))[music_idx]
music_feat_path = os.path.join(self.music_feat_dir, name + ".pkl")
music_feat_data = pkl.load(open(music_feat_path, "rb"))
music_word_feat = music_feat_data['audio_word_feats'][music_idx]
music_seq_feat = music_feat_data['audio_seq_feats'][music_idx]
assert music_word_feat.shape[0] == 229
assert music_word_feat.shape[1] == 768
assert music_seq_feat.shape[0] == 1024
results['meta_data']['start_frame'] = music_data['start_frame']
results['meta_data']['end_frame'] = music_data['end_frame']
music_cond = 1
results.update({
'music_word_feat': music_word_feat,
'music_seq_feat': music_seq_feat,
'music_cond': music_cond
})
return results
def prepare_evaluation(self):
"""
Prepare the dataset for evaluation by initializing evaluators and creating evaluation indexes.
"""
self.evaluators = []
self.eval_indexes = []
self.evaluator_model = build_submodule(self.eval_cfg.get('evaluator_model', None))
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if self.evaluator_model is not None:
self.evaluator_model = self.evaluator_model.to(device)
self.evaluator_model.eval()
self.eval_cfg['evaluator_model'] = self.evaluator_model
for _ in range(self.eval_cfg['replication_times']):
eval_indexes = np.arange(len(self.name_list))
if self.eval_cfg.get('shuffle_indexes', False):
np.random.shuffle(eval_indexes)
self.eval_indexes.append(eval_indexes)
for metric in self.eval_cfg['metrics']:
evaluator, self.eval_indexes = build_evaluator(
metric, self.eval_cfg, len(self.name_list), self.eval_indexes)
self.evaluators.append(evaluator)
self.eval_indexes = np.concatenate(self.eval_indexes)
def process_outputs(self, results):
return results
def evaluate(self, results: List[dict], work_dir: str, logger=None) -> dict:
"""
Evaluate the model performance based on the results.
Args:
results (list): A list of result dictionaries.
work_dir (str): Directory where evaluation logs will be stored.
logger: Logger object to record evaluation results (optional).
Returns:
dict: Dictionary containing evaluation metrics.
"""
metrics = {}
results = self.process_outputs(results)
for evaluator in self.evaluators:
metrics.update(evaluator.evaluate(results))
if logger is not None:
logger.info(metrics)
eval_output = os.path.join(work_dir, 'eval_results.log')
with open(eval_output, 'w') as f:
for k, v in metrics.items():
f.write(k + ': ' + str(v) + '\n')
return metrics
def create_single_dataset(cfg: dict):
dataset_path = cfg['dataset_path']
if dataset_path == 'amass':
return MotionVerseAMASS(**cfg)
elif dataset_path == 'humanml3d':
return MotionVerseH3D(**cfg)
elif dataset_path == 'kitml':
return MotionVerseKIT(**cfg)
elif dataset_path == 'babel':
return MotionVerseBABEL(**cfg)
elif dataset_path == 'motionx':
return MotionVerseMotionX(**cfg)
elif dataset_path == 'humanact12':
return MotionVerseACT12(**cfg)
elif dataset_path == 'uestc':
return MotionVerseUESTC(**cfg)
elif dataset_path == 'ntu':
return MotionVerseNTU(**cfg)
elif dataset_path == 'h36m':
return MotionVerseH36M(**cfg)
elif dataset_path == 'mpi':
return MotionVerseMPI(**cfg)
elif dataset_path == 'pw3d':
return MotionVersePW3D(**cfg)
elif dataset_path == 'aist':
return MotionVerseAIST(**cfg)
elif dataset_path == 'beat':
return MotionVerseBEAT(**cfg)
elif dataset_path == 'tedg':
return MotionVerseTEDG(**cfg)
elif dataset_path == 'tedex':
return MotionVerseTEDEx(**cfg)
elif dataset_path == 's2g3d':
return MotionVerseS2G3D(**cfg)
else:
raise NotImplementedError()
@DATASETS.register_module()
class MotionVerse(Dataset):
"""
A dataset class that handles multiple MotionBench datasets.
Args:
dataset_cfgs (list[str]): List of dataset configurations.
partitions (list[float]): List of partition weights corresponding to the datasets.
num_data (Optional[int]): Number of data samples to load. Defaults to None.
data_prefix (str): Path to the directory containing the dataset.
"""
def __init__(self,
dataset_cfgs: List[dict],
partitions: List[int],
num_data: Optional[int] = None,
data_prefix: Optional[str] = None):
"""Load data from multiple datasets."""
assert min(partitions) >= 0
assert len(dataset_cfgs) == len(partitions)
datasets = []
new_partitions = []
for idx, cfg in enumerate(dataset_cfgs):
if partitions[idx] == 0:
continue
new_partitions.append(partitions[idx])
cfg.update({
'data_prefix': data_prefix
})
datasets.append(create_single_dataset(cfg))
self.dataset = ConcatDataset(datasets)
if num_data is not None:
self.length = num_data
else:
self.length = max(len(ds) for ds in datasets)
partitions = new_partitions
weights = [np.ones(len(ds)) * p / len(ds) for (p, ds) in zip(partitions, datasets)]
weights = np.concatenate(weights, axis=0)
self.weights = weights
self.task_proj = {
'mocap': 0,
't2m': 1,
'v2m': 2,
's2g': 3,
'm2d': 4
}
self.task_idx_list = []
for ds in datasets:
self.task_idx_list += [self.task_proj[ds.task_name]] * len(ds)
def __len__(self) -> int:
"""Get the size of the dataset."""
return self.length
def __getitem__(self, idx: int) -> dict:
"""Given an index, sample data from multiple datasets with the specified proportion."""
return self.dataset[idx]
def get_task_idx(self, idx: int) -> int:
return self.task_idx_list[idx]
@DATASETS.register_module()
class MotionVerseEval(Dataset):
def __init__(self,
eval_cfgs: dict,
testset: str,
test_mode: bool = True):
"""Load data from multiple datasets."""
assert testset in eval_cfgs
dataset_path, task_name = testset.split('_')
dataset_cfg = eval_cfgs[testset]
dataset_cfg['dataset_path'] = dataset_path
dataset_cfg['task_name'] = task_name
dataset_cfg['test_mode'] = test_mode
self.dataset = create_single_dataset(dataset_cfg)
def __len__(self) -> int:
return len(self.dataset)
def __getitem__(self, idx: int) -> dict:
return self.dataset[idx]
def load_annotation(self):
self.dataset.load_annotation()
def prepare_data(self, idx: int) -> dict:
return self.dataset.prepare_data(idx)
def prepare_evaluation(self):
self.dataset.prepare_evaluation()
def process_outputs(self, results):
return self.dataset.process_outputs(results)
def evaluate(self, results: List[dict], work_dir: str, logger=None) -> dict:
return self.dataset.evaluate(results=results, work_dir=work_dir, logger=logger)
@DATASETS.register_module()
class MotionVerseAMASS(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'amass'
task_name = kwargs['task_name']
assert task_name in ['mocap']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseH3D(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'humanml3d'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseKIT(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'kitml'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseBABEL(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'babel'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseMotionX(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'motionx'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseACT12(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'humanact12'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseUESTC(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'uestc'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseNTU(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'ntu'
task_name = kwargs['task_name']
assert task_name in ['mocap', 't2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseH36M(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'h36m'
task_name = kwargs['task_name']
assert task_name in ['mocap', 'v2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseMPI(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'mpi'
task_name = kwargs['task_name']
assert task_name in ['mocap', 'v2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVersePW3D(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = '3dpw'
task_name = kwargs['task_name']
assert task_name in ['mocap', 'v2m']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseAIST(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'aist'
task_name = kwargs['task_name']
assert task_name in ['mocap', 'm2d']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseBEAT(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'beat'
task_name = kwargs['task_name']
assert task_name in ['mocap', 's2g']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseTEDG(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'tedg'
task_name = kwargs['task_name']
assert task_name in ['mocap', 's2g']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseTEDEx(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 'tedex'
task_name = kwargs['task_name']
assert task_name in ['mocap', 's2g']
super().__init__(**kwargs)
@DATASETS.register_module()
class MotionVerseS2G3D(SingleMotionVerseDataset):
def __init__(self, **kwargs):
if 'dataset_path' not in kwargs:
kwargs['dataset_path'] = 's2g3d'
task_name = kwargs['task_name']
assert task_name in ['mocap', 's2g']
super().__init__(**kwargs)