-
Notifications
You must be signed in to change notification settings - Fork 164
/
stock_data.py
101 lines (79 loc) · 3.78 KB
/
stock_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import numpy as np
import math
def scale_range(x, input_range, target_range):
"""
Rescale a numpy array from input to target range
:param x: data to scale
:param input_range: optional input range for data: default 0.0:1.0
:param target_range: optional target range for data: default 0.0:1.0
:return: rescaled array, incoming range [min,max]
"""
range = [np.amin(x), np.amax(x)]
x_std = (x - input_range[0]) / (1.0*(input_range[1] - input_range[0]))
x_scaled = x_std * (1.0*(target_range[1] - target_range[0])) + target_range[0]
return x_scaled, range
def train_test_split_linear_regression(stocks):
"""
Split the data set into training and testing feature for Linear Regression Model
:param stocks: whole data set containing ['Open','Close','Volume'] features
:return: X_train : training sets of feature
:return: X_test : test sets of feature
:return: y_train: training sets of label
:return: y_test: test sets of label
:return: label_range: scaled range of label used in predicting price,
"""
# Create numpy arrays for features and targets
feature = []
label = []
# Convert dataframe columns to numpy arrays for scikit learn
for index, row in stocks.iterrows():
# print([np.array(row['Item'])])
feature.append([(row['Item'])])
label.append([(row['Close'])])
# Regularize the feature and target arrays and store min/max of input data for rescaling later
feature_bounds = [min(feature), max(feature)]
feature_bounds = [feature_bounds[0][0], feature_bounds[1][0]]
label_bounds = [min(label), max(label)]
label_bounds = [label_bounds[0][0], label_bounds[1][0]]
feature_scaled, feature_range = scale_range(np.array(feature), input_range=feature_bounds, target_range=[-1.0, 1.0])
label_scaled, label_range = scale_range(np.array(label), input_range=label_bounds, target_range=[-1.0, 1.0])
# Define Test/Train Split 80/20
split = .315
split = int(math.floor(len(stocks['Item']) * split))
# Set up training and test sets
X_train = feature_scaled[:-split]
X_test = feature_scaled[-split:]
y_train = label_scaled[:-split]
y_test = label_scaled[-split:]
return X_train, X_test, y_train, y_test, label_range
def train_test_split_lstm(stocks, prediction_time=1, test_data_size=450, unroll_length=50):
"""
Split the data set into training and testing feature for Long Short Term Memory Model
:param stocks: whole data set containing ['Open','Close','Volume'] features
:param prediction_time: no of days
:param test_data_size: size of test data to be used
:param unroll_length: how long a window should be used for train test split
:return: X_train : training sets of feature
:return: X_test : test sets of feature
:return: y_train: training sets of label
:return: y_test: test sets of label
"""
# training data
test_data_cut = test_data_size + unroll_length + 1
x_train = stocks[0:-prediction_time - test_data_cut].as_matrix()
y_train = stocks[prediction_time:-test_data_cut]['Close'].as_matrix()
# test data
x_test = stocks[0 - test_data_cut:-prediction_time].as_matrix()
y_test = stocks[prediction_time - test_data_cut:]['Close'].as_matrix()
return x_train, x_test, y_train, y_test
def unroll(data, sequence_length=24):
"""
use different windows for testing and training to stop from leak of information in the data
:param data: data set to be used for unrolling
:param sequence_length: window length
:return: data sets with different window.
"""
result = []
for index in range(len(data) - sequence_length):
result.append(data[index: index + sequence_length])
return np.asarray(result)