249 lines
8.9 KiB
Python
249 lines
8.9 KiB
Python
import numpy as np
|
||
from typing import List, Tuple
|
||
|
||
class ImprovedBPNetwork:
|
||
"""改进的BP神经网络,支持动态学习率调整"""
|
||
|
||
def __init__(self, hidden_layers: List[int], learning_rate: float = 0.01,
|
||
max_epochs: int = 1000, tolerance: float = 1e-6):
|
||
self.hidden_layers = hidden_layers
|
||
self.initial_lr = learning_rate
|
||
self.learning_rate = learning_rate
|
||
self.max_epochs = max_epochs
|
||
self.tolerance = tolerance
|
||
self.weights = []
|
||
self.biases = []
|
||
self.loss_history = []
|
||
|
||
def _sigmoid(self, x: np.ndarray) -> np.ndarray:
|
||
"""Sigmoid激活函数"""
|
||
x = np.clip(x, -500, 500) # 防止溢出
|
||
return 1 / (1 + np.exp(-x))
|
||
|
||
def _sigmoid_derivative(self, x: np.ndarray) -> np.ndarray:
|
||
"""Sigmoid函数的导数"""
|
||
s = self._sigmoid(x)
|
||
return s * (1 - s)
|
||
|
||
def _initialize_weights(self, input_size: int, output_size: int):
|
||
"""初始化权重和偏置"""
|
||
self.weights = []
|
||
self.biases = []
|
||
|
||
# 构建网络结构
|
||
layers = [input_size] + self.hidden_layers + [output_size]
|
||
|
||
# Xavier初始化
|
||
for i in range(len(layers) - 1):
|
||
w = np.random.normal(0, np.sqrt(2.0 / (layers[i] + layers[i+1])),
|
||
(layers[i], layers[i+1]))
|
||
b = np.zeros((1, layers[i+1]))
|
||
self.weights.append(w)
|
||
self.biases.append(b)
|
||
|
||
def _forward_pass(self, X: np.ndarray) -> List[np.ndarray]:
|
||
"""前向传播"""
|
||
activations = [X]
|
||
|
||
for i in range(len(self.weights)):
|
||
z = np.dot(activations[-1], self.weights[i]) + self.biases[i]
|
||
a = self._sigmoid(z)
|
||
activations.append(a)
|
||
|
||
return activations
|
||
|
||
def _backward_pass(self, X: np.ndarray, y: np.ndarray, activations: List[np.ndarray]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
|
||
"""反向传播"""
|
||
m = X.shape[0]
|
||
dw = [np.zeros_like(w) for w in self.weights]
|
||
db = [np.zeros_like(b) for b in self.biases]
|
||
|
||
# 输出层误差
|
||
delta = activations[-1] - y
|
||
|
||
# 从输出层向输入层反向传播
|
||
for i in range(len(self.weights) - 1, -1, -1):
|
||
dw[i] = np.dot(activations[i].T, delta) / m
|
||
db[i] = np.mean(delta, axis=0, keepdims=True)
|
||
|
||
if i > 0:
|
||
delta = np.dot(delta, self.weights[i].T) * self._sigmoid_derivative(
|
||
np.dot(activations[i], self.weights[i]) + self.biases[i])
|
||
|
||
return dw, db
|
||
|
||
def _adaptive_learning_rate(self, epoch: int, current_loss: float, prev_loss: float):
|
||
"""动态调整学习率"""
|
||
if epoch > 0:
|
||
if current_loss > prev_loss:
|
||
# 损失增加,降低学习率
|
||
self.learning_rate *= 0.9
|
||
elif (prev_loss - current_loss) / prev_loss < 0.001:
|
||
# 损失下降缓慢,增加学习率
|
||
self.learning_rate *= 1.05
|
||
|
||
# 限制学习率范围
|
||
self.learning_rate = np.clip(self.learning_rate,
|
||
self.initial_lr * 0.01,
|
||
self.initial_lr * 10)
|
||
|
||
def fit(self, X: np.ndarray, y: np.ndarray):
|
||
"""训练神经网络"""
|
||
# 处理标签
|
||
if len(y.shape) == 1:
|
||
y_encoded = np.zeros((len(y), len(np.unique(y))))
|
||
for i, label in enumerate(np.unique(y)):
|
||
y_encoded[y == label, i] = 1
|
||
y = y_encoded
|
||
|
||
self._initialize_weights(X.shape[1], y.shape[1])
|
||
|
||
prev_loss = float('inf')
|
||
|
||
for epoch in range(self.max_epochs):
|
||
# 前向传播
|
||
activations = self._forward_pass(X)
|
||
|
||
# 计算损失
|
||
loss = np.mean((activations[-1] - y) ** 2)
|
||
self.loss_history.append(loss)
|
||
|
||
# 动态调整学习率
|
||
self._adaptive_learning_rate(epoch, loss, prev_loss)
|
||
|
||
# 反向传播
|
||
dw, db = self._backward_pass(X, y, activations)
|
||
|
||
# 更新权重和偏置
|
||
for i in range(len(self.weights)):
|
||
self.weights[i] -= self.learning_rate * dw[i]
|
||
self.biases[i] -= self.learning_rate * db[i]
|
||
|
||
# 检查收敛
|
||
if abs(prev_loss - loss) < self.tolerance:
|
||
print(f"训练在第{epoch+1}轮收敛")
|
||
break
|
||
|
||
prev_loss = loss
|
||
|
||
if epoch % 100 == 0:
|
||
print(f"Epoch {epoch}, Loss: {loss:.6f}, LR: {self.learning_rate:.6f}")
|
||
|
||
def predict(self, X: np.ndarray) -> np.ndarray:
|
||
"""预测"""
|
||
activations = self._forward_pass(X)
|
||
return np.argmax(activations[-1], axis=1)
|
||
|
||
def predict_proba(self, X: np.ndarray) -> np.ndarray:
|
||
"""预测概率"""
|
||
activations = self._forward_pass(X)
|
||
return activations[-1]
|
||
|
||
class StandardBPNetwork:
|
||
"""标准BP神经网络(固定学习率)"""
|
||
|
||
def __init__(self, hidden_layers: List[int], learning_rate: float = 0.01,
|
||
max_epochs: int = 1000, tolerance: float = 1e-6):
|
||
self.hidden_layers = hidden_layers
|
||
self.learning_rate = learning_rate
|
||
self.max_epochs = max_epochs
|
||
self.tolerance = tolerance
|
||
self.weights = []
|
||
self.biases = []
|
||
self.loss_history = []
|
||
|
||
def _sigmoid(self, x: np.ndarray) -> np.ndarray:
|
||
"""Sigmoid激活函数"""
|
||
x = np.clip(x, -500, 500)
|
||
return 1 / (1 + np.exp(-x))
|
||
|
||
def _sigmoid_derivative(self, x: np.ndarray) -> np.ndarray:
|
||
"""Sigmoid函数的导数"""
|
||
s = self._sigmoid(x)
|
||
return s * (1 - s)
|
||
|
||
def _initialize_weights(self, input_size: int, output_size: int):
|
||
"""初始化权重和偏置"""
|
||
self.weights = []
|
||
self.biases = []
|
||
|
||
layers = [input_size] + self.hidden_layers + [output_size]
|
||
|
||
for i in range(len(layers) - 1):
|
||
w = np.random.normal(0, np.sqrt(2.0 / (layers[i] + layers[i+1])),
|
||
(layers[i], layers[i+1]))
|
||
b = np.zeros((1, layers[i+1]))
|
||
self.weights.append(w)
|
||
self.biases.append(b)
|
||
|
||
def _forward_pass(self, X: np.ndarray) -> List[np.ndarray]:
|
||
"""前向传播"""
|
||
activations = [X]
|
||
|
||
for i in range(len(self.weights)):
|
||
z = np.dot(activations[-1], self.weights[i]) + self.biases[i]
|
||
a = self._sigmoid(z)
|
||
activations.append(a)
|
||
|
||
return activations
|
||
|
||
def _backward_pass(self, X: np.ndarray, y: np.ndarray, activations: List[np.ndarray]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
|
||
"""反向传播"""
|
||
m = X.shape[0]
|
||
dw = [np.zeros_like(w) for w in self.weights]
|
||
db = [np.zeros_like(b) for b in self.biases]
|
||
|
||
delta = activations[-1] - y
|
||
|
||
for i in range(len(self.weights) - 1, -1, -1):
|
||
dw[i] = np.dot(activations[i].T, delta) / m
|
||
db[i] = np.mean(delta, axis=0, keepdims=True)
|
||
|
||
if i > 0:
|
||
delta = np.dot(delta, self.weights[i].T) * self._sigmoid_derivative(
|
||
np.dot(activations[i], self.weights[i]) + self.biases[i])
|
||
|
||
return dw, db
|
||
|
||
def fit(self, X: np.ndarray, y: np.ndarray):
|
||
"""训练神经网络"""
|
||
if len(y.shape) == 1:
|
||
y_encoded = np.zeros((len(y), len(np.unique(y))))
|
||
for i, label in enumerate(np.unique(y)):
|
||
y_encoded[y == label, i] = 1
|
||
y = y_encoded
|
||
|
||
self._initialize_weights(X.shape[1], y.shape[1])
|
||
|
||
prev_loss = float('inf')
|
||
|
||
for epoch in range(self.max_epochs):
|
||
activations = self._forward_pass(X)
|
||
loss = np.mean((activations[-1] - y) ** 2)
|
||
self.loss_history.append(loss)
|
||
|
||
dw, db = self._backward_pass(X, y, activations)
|
||
|
||
for i in range(len(self.weights)):
|
||
self.weights[i] -= self.learning_rate * dw[i]
|
||
self.biases[i] -= self.learning_rate * db[i]
|
||
|
||
if abs(prev_loss - loss) < self.tolerance:
|
||
print(f"标准BP训练在第{epoch+1}轮收敛")
|
||
break
|
||
|
||
prev_loss = loss
|
||
|
||
if epoch % 100 == 0:
|
||
print(f"Standard BP Epoch {epoch}, Loss: {loss:.6f}")
|
||
|
||
def predict(self, X: np.ndarray) -> np.ndarray:
|
||
"""预测"""
|
||
activations = self._forward_pass(X)
|
||
return np.argmax(activations[-1], axis=1)
|
||
|
||
def predict_proba(self, X: np.ndarray) -> np.ndarray:
|
||
"""预测概率"""
|
||
activations = self._forward_pass(X)
|
||
return activations[-1]
|