1
0
Fork 0
mirror of https://github.com/Findus23/collision-analyisis-and-interpolation.git synced 2024-09-18 14:03:51 +02:00

add pytorch model

This commit is contained in:
Lukas Winkler 2021-03-29 15:02:46 +02:00
parent 88bbf9a160
commit b6b55519fc
Signed by: lukas
GPG key ID: 54DE4D798D244853
5 changed files with 227 additions and 131 deletions

View file

@ -1,4 +1,4 @@
from typing import List, Iterator
from typing import List, Iterator, Optional
import numpy as np
@ -10,8 +10,8 @@ class CustomScaler:
"""
def __init__(self):
self.means = None
self.stds = None
self.means: Optional[np.ndarray] = None
self.stds: Optional[np.ndarray] = None
def fit(self, data: np.ndarray) -> None:
self.means = np.mean(data, 0)

19
network.py Normal file
View file

@ -0,0 +1,19 @@
from torch import nn
class Network(nn.Module):
def __init__(self):
super().__init__()
self.hidden = nn.Linear(6, 50)
self.output = nn.Linear(50, 4)
self.sigmoid = nn.Sigmoid()
self.relu = nn.ReLU()
def forward(self, x):
x = self.hidden(x)
x = self.relu(x)
x = self.output(x)
x = self.sigmoid(x)
return x

View file

@ -1,101 +1,167 @@
import os
import json
import random
from pathlib import Path
import keras
import numpy as np
from keras import Sequential
from keras.engine.saving import load_model
from keras.layers import Dense
from keras.utils import plot_model
import torch
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.lines import Line2D
from torch import nn, optim, from_numpy, Tensor
from torch.utils.data import DataLoader, TensorDataset
from CustomScaler import CustomScaler
from config import water_fraction
from network import Network
from simulation_list import SimulationList
simulations = SimulationList.jsonlines_load()
train_data = set([s for s in simulations.simlist])
new_data = [s for s in simulations.simlist if s.type != "original"]
random.seed(1)
test_data = set(random.sample(new_data, int(len(new_data) * 0.2)))
train_data -= test_data
print(len(train_data), len(test_data))
X = np.array(
[[s.alpha, s.v, s.projectile_mass, s.gamma, s.target_water_fraction, s.projectile_water_fraction] for s in
train_data])
scaler = CustomScaler()
scaler.fit(X)
x = scaler.transform_data(X)
print(x.shape)
if water_fraction:
Y = np.array([s.water_retention_both for s in train_data])
else:
Y = np.array([s.mass_retention_both for s in train_data])
print(Y.shape)
X_test = np.array(
[[s.alpha, s.v, s.projectile_mass, s.gamma, s.target_water_fraction, s.projectile_water_fraction] for s in
test_data])
Y_test = np.array([s.mass_retention_both for s in test_data])
x_test = scaler.transform_data(X_test)
# print(X_test)
# print(X[0])
# exit()
random.seed()
tbCallBack = keras.callbacks.TensorBoard(log_dir='./logs/{}'.format(random.randint(0, 100)), histogram_freq=1,
batch_size=32, write_graph=True,
write_grads=True, write_images=True, embeddings_freq=0,
embeddings_layer_names=None, embeddings_metadata=None, embeddings_data=None,
update_freq='epoch')
modelname = "model.hd5" if water_fraction else "model_mass.hd5"
def train():
filename = "rsmc_dataset"
if os.path.exists(modelname):
model = load_model(modelname)
else:
model = Sequential()
model.add(Dense(6, input_dim=6, activation='relu'))
model.add(Dense(4, kernel_initializer='normal', activation='relu'))
model.add(Dense(3, kernel_initializer='normal', activation='relu'))
model.add(Dense(1, kernel_initializer='normal', activation="sigmoid"))
model.compile(loss='mean_squared_error', optimizer='adam')
simulations = SimulationList.jsonlines_load(Path(f"{filename}.jsonl"))
model.summary()
plot_model(model, "model.png", show_shapes=True, show_layer_names=True)
model.fit(x, Y, epochs=200, callbacks=[tbCallBack], validation_data=(x_test, Y_test))
loss = model.evaluate(x_test, Y_test)
print(loss)
if loss > 0.04:
# exit()
...
# print("-------------------------------------")
# exit()
model.save(modelname)
# random.seed(1)
test_data = random.sample(simulations.simlist, int(len(simulations.simlist) * 0.2))
test_set = set(test_data) # use a set for faster *in* computation
train_data = [s for s in simulations.simlist if s not in test_set]
print(len(train_data), len(test_data))
xrange = np.linspace(-0.5, 60.5, 300)
yrange = np.linspace(0.5, 5.5, 300)
xgrid, ygrid = np.meshgrid(xrange, yrange)
mcode = 1e24
wpcode = 15 / 100
wtcode = 15 / 100
gammacode = 0.6
testinput = np.array([[np.nan, np.nan, mcode, gammacode, wtcode, wpcode]] * 300 * 300)
testinput[::, 0] = xgrid.flatten()
testinput[::, 1] = ygrid.flatten()
testinput = scaler.transform_data(testinput)
X = np.array(
[[s.alpha, s.v, s.projectile_mass, s.gamma, s.target_water_fraction, s.projectile_water_fraction] for s in
train_data])
scaler = CustomScaler()
scaler.fit(X)
x = scaler.transform_data(X)
print(x.shape)
Y = np.array([[
s.water_retention_both, s.mantle_retention_both, s.core_retention_both,
s.output_mass_fraction
] for s in train_data])
print(testinput)
print(testinput.shape)
testoutput = model.predict(testinput)
outgrid = np.reshape(testoutput, (300, 300))
print("minmax")
print(np.nanmin(outgrid), np.nanmax(outgrid))
cmap = "Blues" if water_fraction else "Oranges"
plt.imshow(outgrid, interpolation='none', cmap=cmap, aspect="auto", origin="lower", vmin=0, vmax=1,
extent=[xgrid.min(), xgrid.max(), ygrid.min(), ygrid.max()])
X_test = np.array(
[[s.alpha, s.v, s.projectile_mass, s.gamma, s.target_water_fraction, s.projectile_water_fraction] for s in
test_data])
Y_test = np.array([[
s.water_retention_both, s.mantle_retention_both, s.core_retention_both,
s.output_mass_fraction
] for s in test_data])
x_test = scaler.transform_data(X_test)
random.seed()
plt.colorbar().set_label("water retention fraction" if water_fraction else "core mass retention fraction")
plt.xlabel("impact angle $\\alpha$ [$^{\circ}$]")
plt.ylabel("velocity $v$ [$v_{esc}$]")
plt.tight_layout()
plt.savefig("../arbeit/images/plots/nn2.pdf")
plt.show()
dataset = TensorDataset(from_numpy(x).to(torch.float), from_numpy(Y).to(torch.float))
train_dataset = TensorDataset(from_numpy(x_test).to(torch.float), from_numpy(Y_test).to(torch.float))
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
validation_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=False)
network = Network()
loss_fn = nn.MSELoss()
optimizer = optim.Adam(network.parameters())
loss_train = []
loss_vali = []
max_epochs = 120
epochs = 0
fig: Figure = plt.figure()
ax: Axes = fig.gca()
x_axis = np.arange(epochs)
loss_plot: Line2D = ax.plot(x_axis, loss_train, label="loss_train")[0]
vali_plot: Line2D = ax.plot(x_axis, loss_vali, label="loss_validation")[0]
ax.legend()
plt.ion()
plt.pause(0.01)
plt.show()
for e in range(max_epochs):
print(f"Epoch: {e}")
total_loss = 0
network.train()
for xs, ys in dataloader:
# Training pass
optimizer.zero_grad()
output = network(xs)
loss = loss_fn(output, ys)
loss.backward()
optimizer.step()
total_loss += loss.item()
loss_train.append(float(total_loss / len(dataloader)))
print(f"Training loss: {total_loss / len(dataloader)}")
# validation:
network.eval()
total_loss_val = 0
for xs, ys in validation_dataloader:
output = network(xs)
total_loss_val += loss_fn(output, ys).item()
loss_vali.append(float(total_loss_val / len(validation_dataloader)))
print(f"Validation loss: {total_loss_val / len(validation_dataloader)}")
epochs += 1
x_axis = np.arange(epochs)
loss_plot.set_xdata(x_axis)
vali_plot.set_xdata(x_axis)
loss_plot.set_ydata(loss_train)
vali_plot.set_ydata(loss_vali)
ax.relim()
ax.autoscale_view(True, True, True)
plt.pause(0.01)
# plt.draw()
# if epochs > 6:
# a = np.sum(np.array(loss_vali[-3:]))
# b = np.sum(np.array(loss_vali[-6:-3]))
# if a > b: # overfitting on training data, stop training
# print("early stopping")
# break
plt.ioff()
torch.save(network.state_dict(), "pytorch_model.zip")
with open("pytorch_model.json", "w") as f:
export_dict = {}
value_tensor: Tensor
for key, value_tensor in network.state_dict().items():
export_dict[key] = value_tensor.detach().tolist()
export_dict["means"] = scaler.means.tolist()
export_dict["stds"] = scaler.stds.tolist()
json.dump(export_dict, f)
xrange = np.linspace(-0.5, 60.5, 300)
yrange = np.linspace(0.5, 5.5, 300)
xgrid, ygrid = np.meshgrid(xrange, yrange)
mcode = 1e24
wpcode = 1e-4
wtcode = 1e-4
gammacode = 0.6
testinput = np.array([[np.nan, np.nan, mcode, gammacode, wtcode, wpcode]] * 300 * 300)
testinput[::, 0] = xgrid.flatten()
testinput[::, 1] = ygrid.flatten()
testinput = scaler.transform_data(testinput)
print(testinput)
print(testinput.shape)
testoutput: Tensor = network(from_numpy(testinput).to(torch.float))
data = testoutput.detach().numpy()
outgrid = np.reshape(data[::, 0], (300, 300))
print("minmax")
print(np.nanmin(outgrid), np.nanmax(outgrid))
cmap = "Blues"
plt.title(
"m={:3.0e}, gamma={:3.1f}, wt={:2.0f}%, wp={:2.0f}%\n".format(mcode, gammacode, wtcode * 100, wpcode * 100))
plt.imshow(outgrid, interpolation='none', cmap=cmap, aspect="auto", origin="lower", vmin=0, vmax=1,
extent=[xgrid.min(), xgrid.max(), ygrid.min(), ygrid.max()])
plt.colorbar().set_label("water retention fraction")
plt.xlabel("impact angle $\\alpha$ [$^{\circ}$]")
plt.ylabel("velocity $v$ [$v_{esc}$]")
plt.tight_layout()
# plt.savefig("/home/lukas/tmp/nn.svg", transparent=True)
plt.show()
if __name__ == '__main__':
train()

View file

@ -1,4 +1,5 @@
import json
from typing import Optional
class Simulation:
@ -76,7 +77,6 @@ class Simulation:
def second_largest_aggregate_mantle_fraction(self) -> float:
return 1 - self.second_largest_aggregate_core_fraction - self.second_largest_aggregate_water_fraction
@property
def projectile_mantle_fraction(self):
return 1 - self.projectile_water_fraction - self.projectile_core_fraction
@ -128,6 +128,12 @@ class Simulation:
"""
return self.largest_aggregate_mass * self.largest_aggregate_water_fraction / self.initial_water_mass
@property
def output_mass_fraction(self) -> Optional[float]:
if not self.largest_aggregate_mass:
return 0 # FIXME
return self.second_largest_aggregate_mass / self.largest_aggregate_mass
@property
def original_simulation(self) -> bool:
return self.type == "original"

View file

@ -1,79 +1,84 @@
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from keras.engine.saving import load_model
import torch
from matplotlib.collections import QuadMesh
from matplotlib.widgets import Slider
from sklearn.preprocessing import StandardScaler
from CustomScaler import CustomScaler
from network import Network
from simulation_list import SimulationList
simlist = SimulationList.jsonlines_load()
simlist = SimulationList.jsonlines_load(Path("rsmc_dataset.jsonl"))
resolution = 100
X = simlist.X
scaler = StandardScaler()
scaler.fit(X)
data = simlist.X
scaler = CustomScaler()
scaler.fit(data)
fig, ax = plt.subplots()
plt.subplots_adjust(bottom=0.35)
t = np.arange(0.0, 1.0, 0.001)
mcode_default, gamma_default, wt_default, wp_default = [24.0, 1, 15.0, 15.0]
xrange = np.linspace(-0.5, 60.5, 100)
yrange = np.linspace(0.5, 5.5, 100)
xgrid, ygrid = np.meshgrid(xrange, yrange)
mcode = 24.
wpcode = 15 / 100
wtcode = 15 / 100
gammacode = 1
alpharange = np.linspace(-0.5, 60.5, resolution)
vrange = np.linspace(0.5, 5.5, resolution)
grid_alpha, grid_v = np.meshgrid(alpharange, vrange)
testinput = np.array([[np.nan, np.nan, mcode, gammacode, wtcode, wpcode]] * 100 * 100)
testinput[::, 0] = xgrid.flatten()
testinput[::, 1] = ygrid.flatten()
testinput = scaler.transform(testinput)
model = Network()
model.load_state_dict(torch.load("pytorch_model.zip"))
model = load_model("model.hd5")
datagrid = np.zeros_like(grid_alpha)
testoutput = model.predict(testinput)
outgrid = np.reshape(testoutput, (100, 100))
mesh = plt.pcolormesh(xgrid, ygrid, outgrid, cmap="Blues", vmin=0, vmax=1) # type:QuadMesh
mesh = plt.pcolormesh(grid_alpha, grid_v, datagrid, cmap="Blues", vmin=0, vmax=1, shading="auto") # type:QuadMesh
plt.colorbar()
axcolor = 'lightgoldenrodyellow'
ax_mcode = plt.axes([0.25, 0.1, 0.65, 0.03], facecolor=axcolor)
ax_gamma = plt.axes([0.25, 0.15, 0.65, 0.03], facecolor=axcolor)
ax_wt = plt.axes([0.25, 0.20, 0.65, 0.03], facecolor=axcolor)
ax_wp = plt.axes([0.25, 0.25, 0.65, 0.03], facecolor=axcolor)
ax_mcode = plt.axes([0.25, 0.1, 0.65, 0.03])
ax_gamma = plt.axes([0.25, 0.15, 0.65, 0.03])
ax_wt = plt.axes([0.25, 0.20, 0.65, 0.03])
ax_wp = plt.axes([0.25, 0.25, 0.65, 0.03])
ax_mode = plt.axes([0.25, 0.05, 0.65, 0.03])
s_mcode = Slider(ax_mcode, 'mcode', 21, 25, valinit=mcode_default)
s_gamma = Slider(ax_gamma, 'gamma', 0.1, 1, valinit=gamma_default)
s_wt = Slider(ax_wt, 'wt', 10, 20, valinit=wt_default)
s_wp = Slider(ax_wp, 'wp', 10, 20, valinit=wp_default)
s_wt = Slider(ax_wt, 'wt', 1e-5, 1e-3, valinit=wt_default)
s_wp = Slider(ax_wp, 'wp', 1e-5, 1e-3, valinit=wp_default)
s_mode = Slider(ax_mode, 'shell/mantle/core/mass_fraction', 1, 4, valinit=1, valstep=1)
def update(val):
mcode = 10 ** s_mcode.val
mcode = s_mcode.val
gamma = s_gamma.val
wt = s_wt.val / 100
wp = s_wp.val / 100
testinput = np.array([[np.nan, np.nan, mcode, gamma, wt, wp]] * 100 * 100)
testinput[::, 0] = xgrid.flatten()
testinput[::, 1] = ygrid.flatten()
testinput = scaler.transform(testinput)
wt = s_wt.val
wp = s_wp.val
mode = s_mode.val
testinput = np.array([[np.nan, np.nan, 10 ** mcode, gamma, wt, wp]] * resolution * resolution)
testinput[::, 0] = grid_alpha.flatten()
testinput[::, 1] = grid_v.flatten()
testinput = scaler.transform_data(testinput)
testoutput = model.predict(testinput)
outgrid = np.reshape(testoutput, (100, 100))
# if not isinstance(datagrid, np.ndarray):
# return False
formatedgrid = outgrid[:-1, :-1]
try:
testoutput: torch.Tensor = model(torch.from_numpy(testinput).to(torch.float))
data = testoutput.detach().numpy()
print(data.shape)
except TypeError: # can't convert np.ndarray of type numpy.object_.
data = np.zeros((resolution ** 2, 3))
mesh.set_array(formatedgrid.ravel())
datagrid = np.reshape(data[::, mode - 1], (resolution, resolution))
mesh.set_array(datagrid.ravel())
fig.canvas.draw_idle()
update(None)
s_gamma.on_changed(update)
s_mcode.on_changed(update)
s_wp.on_changed(update)
s_wt.on_changed(update)
s_mode.on_changed(update)
plt.show()