from torch.utils.data import Dataset
import torch
from scipy.io import loadmat
import numpy as np
import pandas as pd
import os
from scipy import signal
from sklearn.preprocessing import MinMaxScaler
class MyDataset(Dataset):
def __init__(self,
path: str,
dataset: str):
"""
训练数据集与测试数据集的Dataset对象
:param path: 数据集路径
:param dataset: 区分是获得训练集还是测试集
"""
super(MyDataset, self).__init__()
self.dataset = dataset # 选择获取测试集还是训练集
self.train_len,
self.test_len,
self.input_len,
self.channel_len,
self.output_len,
self.train_dataset,
self.train_label,
self.test_dataset,
self.test_label,
self.max_length_sample_inTest,
self.train_dataset_with_no_paddding = self.pre_option(path)
def __getitem__(self, index):
if self.dataset == 'train':
return self.train_dataset[index], self.train_label[index] - 1
elif self.dataset == 'test':
return self.test_dataset[index], self.test_label[index] - 1
def __len__(self):
if self.dataset == 'train':
return self.train_len
elif self.dataset == 'test':
return self.test_len
# 数据预处理
def pre_option(self, path: str):
"""
数据预处理 由于每个样本的时间步维度不同,在此使用最长的时间步作为时间步的维度,使用0进行填充
:param path: 数据集路径
:return: 训练集样本数量,测试集样本数量,时间步维度,通道数,分类数,训练集数据,训练集标签,测试集数据,测试集标签,测试集中时间步最长的样本列表,没有padding的训练集数据
"""
def read_data(name1):
# name1 = '测试健康'
path1 = r'new - 副本/'+ name1 + '/'
files_1 = os.listdir(path1) # 得到path1文件夹下所有文件的名称
# i = 0
for index,name in enumerate(files_1):
x = pd.read_csv((path1+name),header = None)
x = np.array(x)
# print(name)
# print('xinzhuang',x.shape)
# i += 1
if index ==0:
data = x[0:15000,0:1]
else:
data = np.concatenate((data,x[0:15000,0:1]),axis=0)
# =============================================================================
#
# if index ==0:
# data = x[0:18000,0:3]
# else:
# data = np.concatenate((data,x[0+3*index:18000+3*index,0:3]),axis=0)
# # data = np.concatenate((data,x[0:18000]),axis=0)
#
# =============================================================================
# data = data.reshape(-1,200,3)
# data = data.reshape(-1,100,3)
# data = data.reshape(-1,400,3)
data = data.reshape(-1,500,1)
# data = data.reshape(-1,200,3)
# data = data.reshape(-1,50,3)
return data
train_data_jk = read_data('trainingB')
train_data_tnb = read_data('trainingT')
test_data_jk = read_data('testingB')
test_data_tnb = read_data('testingT')
train_jk_len = len(train_data_jk)
train_tnb_len = len(train_data_tnb)
test_jk_len = len(test_data_jk)
test_tnb_len = len(test_data_tnb)
train_data = np.concatenate((train_data_jk,train_data_tnb),axis=0)
test_data = np.concatenate((test_data_jk,test_data_tnb),axis=0)
train_label = [1]*train_jk_len + [2]*train_tnb_len
test_label = [1]*test_jk_len + [2]*test_tnb_len
train_label = np.array(train_label)
test_label = np.array(test_label)
# train_len = train_data.shape[0]
train_len = len(train_data)
# test_len = test_data.shape[0]
test_len = len(test_data)
# output_len = len(tuple(set(train_label)))
output_len = 2
# 时间步最大值
max_lenth = 0 # 93
for item in train_data:
item = torch.as_tensor(item).float()
if item.shape[1] > max_lenth:
max_lenth = item.shape[1]
# max_length_index = train_data.tolist().index(item.tolist())
for item in test_data:
item = torch.as_tensor(item).float()
if item.shape[1] > max_lenth:
max_lenth = item.shape[1]
# 填充Padding 使用0进行填充
# train_data, test_data为numpy.object 类型,不能直接对里面的numpy.ndarray进行处理
train_dataset_with_no_paddding = []
test_dataset_with_no_paddding = []
train_dataset = []
test_dataset = []
max_length_sample_inTest = []
for x1 in train_data:
x1 = MinMaxScaler().fit_transform(x1)#实现归一化
train_dataset_with_no_paddding.append(x1.transpose(-1, -2).tolist())
x1 = torch.as_tensor(x1).float()
if x1.shape[1] != max_lenth:
padding = torch.zeros(x1.shape[0], max_lenth - x1.shape[1])
x1 = torch.cat((x1, padding), dim=1)
train_dataset.append(x1)
for index, x2 in enumerate(test_data):
x2 = MinMaxScaler().fit_transform(x2)#实现归一化
test_dataset_with_no_paddding.append(x2.transpose(-1, -2).tolist())
x2 = torch.as_tensor(x2).float()
if x2.shape[1] != max_lenth:
padding = torch.zeros(x2.shape[0], max_lenth - x2.shape[1])
x2 = torch.cat((x2, padding), dim=1)
else:
max_length_sample_inTest.append(x2.transpose(-1, -2))
test_dataset.append(x2)
# 最后维度 [数据条数,时间步数最大值,时间序列维度]
# train_dataset_with_no_paddding = torch.stack(train_dataset_with_no_paddding, dim=0).permute(0, 2, 1)
# test_dataset_with_no_paddding = torch.stack(test_dataset_with_no_paddding, dim=0).permute(0, 2, 1)
train_dataset = torch.stack(train_dataset, dim=0).permute(0, 2, 1)
test_dataset = torch.stack(test_dataset, dim=0).permute(0, 2, 1)
train_label = torch.Tensor(train_label)
test_label = torch.Tensor(test_label)
channel = test_dataset[0].shape[-1]
input = test_dataset[0].shape[-2]
return train_len, test_len, input, channel, output_len, train_dataset, train_label, test_dataset, test_label, max_length_sample_inTest, train_dataset_with_no_paddding