|
import torch |
|
import numpy as np |
|
import random |
|
import os |
|
import json |
|
from scipy.signal import resample |
|
import clip |
|
from torch.utils.data import Dataset |
|
|
|
class CLIPDataset(Dataset): |
|
|
|
def __init__(self, args): |
|
|
|
imu_dirs = [ |
|
f'{args.data_path}/sim/', |
|
] |
|
text_dirs = [ |
|
f'{args.data_path}/aug_texts/', |
|
] |
|
self.paths = [] |
|
for imu_dir, text_dir in zip(imu_dirs, text_dirs): |
|
imu_files = [f.split('.')[0] for f in os.listdir(imu_dir) if os.path.isfile(os.path.join(imu_dir, f))] |
|
text_files = [f.split('.')[0] for f in os.listdir(text_dir) if os.path.isfile(os.path.join(text_dir, f))] |
|
common_files = [f for f in imu_files if f in text_files] |
|
for f in common_files: |
|
self.paths.append((os.path.join(imu_dir, f + '.npy'), os.path.join(text_dir, f + '.txt'))) |
|
|
|
self.args = args |
|
if args.sample < 1: |
|
self.paths = random.sample(self.paths, int(len(self.paths) * args.sample)) |
|
|
|
def __len__(self): |
|
return len(self.paths) |
|
|
|
def __getitem__(self, idx): |
|
|
|
|
|
imu_path, text_path = self.paths[idx] |
|
imu = np.load(imu_path) |
|
imu[np.isnan(imu)] = 0 |
|
|
|
|
|
if len(imu) < self.args.padding_size: |
|
imu = np.pad(imu, ((0, self.args.padding_size - len(imu)), (0, 0), (0, 0)), mode='wrap') |
|
imu = imu[:self.args.padding_size] |
|
|
|
|
|
mask = np.zeros_like(imu) |
|
k = np.random.randint(1, 6) |
|
selected_joints = np.random.choice(22, k, replace=False) |
|
mask[:,selected_joints] = 1 |
|
imu = imu.reshape(len(imu), -1) |
|
mask = mask.reshape(len(mask), -1) |
|
|
|
|
|
with open(text_path, 'r') as file: |
|
lines = file.readlines() |
|
|
|
text = random.choice(lines).split('#')[0].strip() |
|
|
|
batch = {} |
|
batch['imu'] = imu |
|
batch['text'] = text |
|
batch['mask'] = mask |
|
|
|
return batch |
|
|
|
def select_samples(data, masks, labels, k, name, data_path): |
|
unique_labels = torch.unique(labels) |
|
selected_data = [] |
|
selected_masks = [] |
|
selected_labels = [] |
|
all_indices = torch.load(f'{data_path}/few_shot_data_2/{name}_k={k}.pth') |
|
|
|
for i, label in enumerate(unique_labels): |
|
selected_indices = all_indices[i] |
|
selected_data.append(data[selected_indices]) |
|
selected_masks.append(masks[selected_indices]) |
|
selected_labels.append(labels[selected_indices]) |
|
|
|
selected_data = torch.cat(selected_data, dim=0) |
|
selected_masks = torch.cat(selected_masks, dim=0) |
|
selected_labels = torch.cat(selected_labels, dim=0) |
|
|
|
return selected_data, selected_masks, selected_labels |
|
|
|
def load(dataset, padding_size, data_path, split='test', k=None): |
|
|
|
print(dataset) |
|
|
|
X = np.load(f'{data_path}/{dataset}/X_{split}.npy') |
|
real_labels = torch.from_numpy(np.load(f'{data_path}/{dataset}/y_{split}.npy')) |
|
with open(f'{data_path}/{dataset}/{dataset}.json', 'r') as file: |
|
data = json.load(file) |
|
all_X = np.zeros((X.shape[0], X.shape[1], 22, 6)) |
|
|
|
if dataset == 'PAMAP': |
|
all_X[:,:,21] = np.concatenate((X[:,:,0:3], X[:,:,3:6]), axis=-1) |
|
all_X[:,:,11] = np.concatenate((X[:,:,18:21], X[:,:,21:24]), axis=-1) |
|
all_X[:,:,7] = np.concatenate((X[:,:,9:12], X[:,:,12:15]), axis=-1) |
|
original_sampling_rate = 100 |
|
num_classes = 12 |
|
|
|
elif dataset == 'USCHAD': |
|
all_X[:,:,5] = np.concatenate((X[:,:,0:3] * 9.80665, X[:,:,3:6] / 180 * np.pi), axis=-1) |
|
original_sampling_rate = 100 |
|
num_classes = 12 |
|
|
|
elif dataset == 'UCIHAR': |
|
all_X[:,:,9] = np.concatenate((X[:,:,6:9] * 9.80665, X[:,:,3:6]), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 6 |
|
|
|
elif dataset == 'Opp_g': |
|
all_X[:,:,10] = np.concatenate((X[:,:,0:3] / 1000 * 9.8, X[:,:,3:6] / 1000), axis=-1) |
|
all_X[:,:,19] = np.concatenate((X[:,:,9:12] / 1000 * 9.8, X[:,:,12:15] / 1000), axis=-1) |
|
all_X[:,:,20] = np.concatenate((X[:,:,18:21] / 1000 * 9.8, X[:,:,21:24] / 1000), axis=-1) |
|
all_X[:,:,15] = np.concatenate((X[:,:,27:30] / 1000 * 9.8, X[:,:,30:33] / 1000), axis=-1) |
|
all_X[:,:,16] = np.concatenate((X[:,:,36:39] / 1000 * 9.8, X[:,:,39:42] / 1000), axis=-1) |
|
original_sampling_rate = 30 |
|
num_classes = 4 |
|
|
|
elif dataset == 'WISDM': |
|
all_X[:,:,21] = np.concatenate((X[:,:,0:3], X[:,:,3:6]), axis=-1) |
|
original_sampling_rate = 20 |
|
num_classes = 18 |
|
|
|
elif dataset == 'DSADS': |
|
all_X[:,:,11] = np.concatenate((X[:,:,0:3], X[:,:,3:6]), axis=-1) |
|
all_X[:,:,21] = np.concatenate((X[:,:,9:12], X[:,:,12:15]), axis=-1) |
|
all_X[:,:,17] = np.concatenate((X[:,:,18:21], X[:,:,21:24]), axis=-1) |
|
all_X[:,:,6] = np.concatenate((X[:,:,27:30], X[:,:,30:33]), axis=-1) |
|
all_X[:,:,2] = np.concatenate((X[:,:,36:39], X[:,:,39:42]), axis=-1) |
|
original_sampling_rate = 25 |
|
num_classes = 19 |
|
|
|
elif dataset == 'Harth': |
|
all_X[:,:,9,:3] = X[:,:,:3] * 9.80665 |
|
all_X[:,:,6,:3] = X[:,:,3:6] * 9.80665 |
|
original_sampling_rate = 50 |
|
num_classes = 12 |
|
|
|
elif dataset == 'Wharf': |
|
X = -14.709 + X / 63 * (2 * 14.709) |
|
all_X[:,:,21,:3] = X |
|
original_sampling_rate = 32 |
|
num_classes = 14 |
|
|
|
elif dataset == 'Mhealth': |
|
all_X[:,:,11,:3] = X[:,:,0:3] |
|
all_X[:,:,3] = np.concatenate((X[:,:,6:9], X[:,:,9:12] / 180 * np.pi), axis=-1) |
|
all_X[:,:,21] = np.concatenate((X[:,:,15:18], X[:,:,18:21] / 180 * np.pi), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 12 |
|
|
|
elif dataset == 'UTD-MHAD': |
|
all_X[real_labels < 21,:,21,:] = np.concatenate((X[real_labels < 21,:,0:3] * 9.80665, X[real_labels < 21,:,3:6] / 180 * np.pi), axis=-1) |
|
all_X[real_labels >= 21,:,5,:] = np.concatenate((X[real_labels >= 21,:,0:3] * 9.80665, X[real_labels >= 21,:,3:6] / 180 * np.pi), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 27 |
|
|
|
elif dataset == 'MotionSense': |
|
all_X[:,:,5] = np.concatenate((X[:,:,:3] * 9.80665, X[:,:,3:6]), axis=-1) |
|
all_X[:,:,1] = np.concatenate((X[:,:,:3] * 9.80665, X[:,:,3:6]), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 6 |
|
|
|
elif dataset == 'w-HAR': |
|
all_X[:,:,7] = np.concatenate((X[:,:,:3] * 9.80665, X[:,:,3:6] / 180 * np.pi), axis=-1) |
|
original_sampling_rate = 250 |
|
num_classes = 7 |
|
|
|
elif dataset == 'Shoaib': |
|
all_X[:,:,1] = X[:,:,:6] |
|
all_X[:,:,5] = X[:,:,6:12] |
|
all_X[:,:,21] = X[:,:,12:18] |
|
all_X[:,:,20] = X[:,:,18:24] |
|
all_X[:,:,0] = X[:,:,24:30] |
|
original_sampling_rate = 50 |
|
num_classes = 7 |
|
|
|
elif dataset == 'har70plus': |
|
all_X[:,:,0,:3] = X[:,:,:3] * 9.80665 |
|
all_X[:,:,5,:3] = X[:,:,3:6] * 9.80665 |
|
original_sampling_rate = 50 |
|
num_classes = 7 |
|
|
|
elif dataset == 'MMAct': |
|
all_X[:,:,5] = np.concatenate((X[:,:,:3], X[:,:,3:6]), axis=-1) |
|
all_X[:,:,21,:3] = X[:,:,6:9] |
|
original_sampling_rate = 50 |
|
num_classes = 35 |
|
|
|
elif dataset == 'realworld': |
|
all_X[:,:,14] = np.concatenate((X[:,:,:3], X[:,:,3:6]), axis=-1) |
|
all_X[:,:,16] = np.concatenate((X[:,:,6:9], X[:,:,9:12]), axis=-1) |
|
all_X[:,:,13] = np.concatenate((X[:,:,12:15], X[:,:,15:18]), axis=-1) |
|
all_X[:,:,3] = np.concatenate((X[:,:,18:21], X[:,:,21:24]), axis=-1) |
|
all_X[:,:,1] = np.concatenate((X[:,:,24:27], X[:,:,27:30]), axis=-1) |
|
all_X[:,:,15] = np.concatenate((X[:,:,30:33], X[:,:,33:36]), axis=-1) |
|
all_X[:,:,9] = np.concatenate((X[:,:,36:39], X[:,:,39:42]), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 8 |
|
|
|
elif dataset == 'TNDA-HAR': |
|
all_X[:,:,20] = np.concatenate((X[:,:,:3], X[:,:,3:6]), axis=-1) |
|
all_X[:,:,2] = np.concatenate((X[:,:,6:9], X[:,:,9:12]), axis=-1) |
|
all_X[:,:,21] = np.concatenate((X[:,:,12:15], X[:,:,15:18]), axis=-1) |
|
all_X[:,:,3] = np.concatenate((X[:,:,18:21], X[:,:,21:24]), axis=-1) |
|
all_X[:,:,11] = np.concatenate((X[:,:,24:27], X[:,:,27:30]), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 8 |
|
|
|
elif dataset == 'ut-complex': |
|
all_X[:,:,5] = np.concatenate((X[:,:,:3], X[:,:,3:6]), axis=-1) |
|
all_X[:,:,21] = np.concatenate((X[:,:,6:9], X[:,:,9:12]), axis=-1) |
|
original_sampling_rate = 50 |
|
num_classes = 13 |
|
|
|
all_X = all_X.reshape(all_X.shape[0], all_X.shape[1], 22 * 6) |
|
|
|
|
|
new_sampling_rate = 20 |
|
new_length = int((all_X.shape[1] / original_sampling_rate) * new_sampling_rate) |
|
resampled_data = np.array([resample(sequence, new_length) for sequence in all_X]) |
|
|
|
|
|
masks = np.ones_like(resampled_data) |
|
if resampled_data.shape[1] < padding_size: |
|
resampled_data = np.pad(resampled_data, ((0, 0), (0, padding_size - resampled_data.shape[1]), (0, 0)), 'wrap') |
|
masks = np.pad(masks, ((0, 0), (0, padding_size - masks.shape[1]), (0, 0)), 'constant') |
|
real_inputs = torch.from_numpy(resampled_data[:,:padding_size,:]).float() |
|
real_masks = torch.from_numpy(masks[:,:padding_size,:]).float() |
|
|
|
if split == 'train' and k and k < len(real_inputs): |
|
real_inputs, real_masks, real_labels = select_samples(real_inputs, real_masks, real_labels, k, dataset, data_path) |
|
print(real_inputs.shape, real_labels.shape) |
|
|
|
|
|
label_dictionary = data['label_dictionary'] |
|
label_list = [' '.join(labels) for labels in label_dictionary.values()] |
|
all_text = clip.tokenize(label_list).cuda() |
|
|
|
return real_inputs, real_masks, real_labels, label_list, all_text, num_classes |
|
|
|
def load_multiple(dataset_list, padding_size, data_path, split='test', k=None): |
|
|
|
real_inputs_list, real_masks_list, real_labels_list, label_list_list, all_text_list, num_classes_list = [], [], [], [], [], [] |
|
for dataset in dataset_list: |
|
real_inputs, real_masks, real_labels, label_list, all_text, num_classes = load(dataset, padding_size, data_path, split, k) |
|
real_inputs_list.append(real_inputs) |
|
real_masks_list.append(real_masks) |
|
real_labels_list.append(real_labels) |
|
label_list_list.append(label_list) |
|
all_text_list.append(all_text) |
|
num_classes_list.append(num_classes) |
|
|
|
return real_inputs_list, real_masks_list, real_labels_list, label_list_list, all_text_list, num_classes_list |
|
|
|
def load_custom_data(X_path, y_path, config_path, joint_list, original_sampling_rate, padding_size=200, split='test', k=None, few_shot_path=None): |
|
|
|
X = np.load(X_path) |
|
real_labels = torch.from_numpy(np.load(y_path)) |
|
with open(config_path, 'r') as file: |
|
data = json.load(file) |
|
all_X = np.zeros((X.shape[0], X.shape[1], 22, 6)) |
|
|
|
for i, joint in enumerate(joint_list): |
|
all_X[:,:,joint] = np.concatenate((X[:,:,6*i:6*i+3], X[:,:,6*i+3:6*i+6]), axis=-1) |
|
|
|
all_X = all_X.reshape(all_X.shape[0], all_X.shape[1], 22 * 6) |
|
|
|
|
|
new_sampling_rate = 20 |
|
new_length = int((all_X.shape[1] / original_sampling_rate) * new_sampling_rate) |
|
resampled_data = np.array([resample(sequence, new_length) for sequence in all_X]) |
|
|
|
|
|
masks = np.ones_like(resampled_data) |
|
if resampled_data.shape[1] < padding_size: |
|
resampled_data = np.pad(resampled_data, ((0, 0), (0, padding_size - resampled_data.shape[1]), (0, 0)), 'wrap') |
|
masks = np.pad(masks, ((0, 0), (0, padding_size - masks.shape[1]), (0, 0)), 'constant') |
|
real_inputs = torch.from_numpy(resampled_data[:,:padding_size,:]).float() |
|
real_masks = torch.from_numpy(masks[:,:padding_size,:]).float() |
|
|
|
if split == 'train' and k and k < len(real_inputs): |
|
|
|
unique_labels = torch.unique(real_labels) |
|
|
|
if few_shot_path is None: |
|
print('Generating few shot indices ...') |
|
all_indices = [] |
|
for i, label in enumerate(unique_labels): |
|
indices = torch.where(real_labels == label)[0] |
|
selected_indices = indices[torch.randperm(len(indices))[:k]] |
|
all_indices.append(selected_indices) |
|
else: |
|
print('Loading existing few shot indices ...') |
|
all_indices = torch.load(few_shot_path) |
|
|
|
selected_data = [] |
|
selected_masks = [] |
|
selected_labels = [] |
|
for i, label in enumerate(unique_labels): |
|
selected_indices = all_indices[i] |
|
selected_data.append(real_inputs[selected_indices]) |
|
selected_masks.append(real_masks[selected_indices]) |
|
selected_labels.append(real_labels[selected_indices]) |
|
selected_data = torch.cat(selected_data, dim=0) |
|
selected_masks = torch.cat(selected_masks, dim=0) |
|
selected_labels = torch.cat(selected_labels, dim=0) |
|
real_inputs, real_masks, real_labels = selected_data, selected_masks, selected_labels |
|
|
|
print(real_inputs.shape, real_labels.shape) |
|
|
|
|
|
label_dictionary = data['label_dictionary'] |
|
label_list = [' '.join(labels) for labels in label_dictionary.values()] |
|
all_text = clip.tokenize(label_list).cuda() |
|
|
|
return real_inputs, real_masks, real_labels, label_list, all_text |