feat: Implement comprehensive machine learning models and evaluations for Online Shoppers Intention and Breast Cancer datasets

This commit is contained in:
grtsinry43 2025-06-02 14:01:17 +08:00
parent 047b03a590
commit 8e06e86972
Signed by: grtsinry43
GPG Key ID: F3305FB3A978C934
9 changed files with 2430 additions and 2107 deletions

View File

@ -1,220 +0,0 @@
import numpy as np
from typing import Dict, List, Tuple
from collections import Counter
class NaiveBayesClassifier:
"""朴素贝叶斯分类器"""
def __init__(self):
self.class_priors = {}
self.feature_likelihoods = {}
self.classes = None
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练朴素贝叶斯分类器"""
self.classes = np.unique(y)
n_samples, n_features = X.shape
# 计算类先验概率
for c in self.classes:
self.class_priors[c] = np.sum(y == c) / n_samples
# 计算特征似然
self.feature_likelihoods = {}
for c in self.classes:
class_data = X[y == c]
self.feature_likelihoods[c] = {
'mean': np.mean(class_data, axis=0),
'var': np.var(class_data, axis=0) + 1e-10 # 避免除零
}
def _gaussian_probability(self, x: float, mean: float, var: float) -> float:
"""计算高斯概率密度"""
return (1 / np.sqrt(2 * np.pi * var)) * np.exp(-0.5 * ((x - mean) ** 2) / var)
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
predictions = []
for sample in X:
class_scores = {}
for c in self.classes:
# 计算后验概率(对数形式避免下溢)
log_prob = np.log(self.class_priors[c])
for i, feature_value in enumerate(sample):
mean = self.feature_likelihoods[c]['mean'][i]
var = self.feature_likelihoods[c]['var'][i]
log_prob += np.log(self._gaussian_probability(feature_value, mean, var))
class_scores[c] = log_prob
# 选择概率最大的类
predicted_class = max(class_scores, key=class_scores.get)
predictions.append(predicted_class)
return np.array(predictions)
class KNNClassifier:
"""K最近邻分类器"""
def __init__(self, k: int = 3):
self.k = k
self.X_train = None
self.y_train = None
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练KNN分类器"""
self.X_train = X
self.y_train = y
def _euclidean_distance(self, x1: np.ndarray, x2: np.ndarray) -> float:
"""计算欧几里得距离"""
return np.sqrt(np.sum((x1 - x2) ** 2))
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
predictions = []
for sample in X:
# 计算与所有训练样本的距离
distances = []
for i, train_sample in enumerate(self.X_train):
dist = self._euclidean_distance(sample, train_sample)
distances.append((dist, self.y_train[i]))
# 选择k个最近邻
distances.sort(key=lambda x: x[0])
k_nearest = distances[:self.k]
# 投票决定类别
votes = [label for _, label in k_nearest]
predicted_class = max(set(votes), key=votes.count)
predictions.append(predicted_class)
return np.array(predictions)
class DecisionTreeNode:
"""决策树节点"""
def __init__(self):
self.feature_idx = None
self.threshold = None
self.left = None
self.right = None
self.prediction = None
self.is_leaf = False
class DecisionTreeClassifier:
"""决策树分类器"""
def __init__(self, max_depth: int = 10, min_samples_split: int = 2):
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.root = None
def _gini_impurity(self, y: np.ndarray) -> float:
"""计算基尼不纯度"""
if len(y) == 0:
return 0
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return 1 - np.sum(probabilities ** 2)
def _information_gain(self, y: np.ndarray, y_left: np.ndarray, y_right: np.ndarray) -> float:
"""计算信息增益"""
n = len(y)
n_left, n_right = len(y_left), len(y_right)
if n_left == 0 or n_right == 0:
return 0
gini_parent = self._gini_impurity(y)
gini_children = (n_left / n) * self._gini_impurity(y_left) + (n_right / n) * self._gini_impurity(y_right)
return gini_parent - gini_children
def _best_split(self, X: np.ndarray, y: np.ndarray) -> Tuple[int, float, float]:
"""找到最佳分割"""
best_gain = 0
best_feature_idx = None
best_threshold = None
n_features = X.shape[1]
for feature_idx in range(n_features):
feature_values = X[:, feature_idx]
thresholds = np.unique(feature_values)
for threshold in thresholds:
left_mask = feature_values <= threshold
right_mask = ~left_mask
if np.sum(left_mask) == 0 or np.sum(right_mask) == 0:
continue
y_left, y_right = y[left_mask], y[right_mask]
gain = self._information_gain(y, y_left, y_right)
if gain > best_gain:
best_gain = gain
best_feature_idx = feature_idx
best_threshold = threshold
return best_feature_idx, best_threshold, best_gain
def _build_tree(self, X: np.ndarray, y: np.ndarray, depth: int = 0) -> DecisionTreeNode:
"""构建决策树"""
node = DecisionTreeNode()
# 停止条件
if (depth >= self.max_depth or
len(np.unique(y)) == 1 or
len(y) < self.min_samples_split):
node.is_leaf = True
node.prediction = max(set(y), key=list(y).count)
return node
# 找到最佳分割
feature_idx, threshold, gain = self._best_split(X, y)
if gain == 0:
node.is_leaf = True
node.prediction = max(set(y), key=list(y).count)
return node
# 分割数据
left_mask = X[:, feature_idx] <= threshold
right_mask = ~left_mask
node.feature_idx = feature_idx
node.threshold = threshold
node.left = self._build_tree(X[left_mask], y[left_mask], depth + 1)
node.right = self._build_tree(X[right_mask], y[right_mask], depth + 1)
return node
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练决策树"""
self.root = self._build_tree(X, y)
def _predict_sample(self, sample: np.ndarray, node: DecisionTreeNode):
"""预测单个样本"""
if node.is_leaf:
return node.prediction
if sample[node.feature_idx] <= node.threshold:
return self._predict_sample(sample, node.left)
else:
return self._predict_sample(sample, node.right)
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
predictions = []
for sample in X:
prediction = self._predict_sample(sample, self.root)
predictions.append(prediction)
return np.array(predictions)

View File

@ -1,142 +0,0 @@
import numpy as np
from typing import List, Any
from classifiers import DecisionTreeClassifier, NaiveBayesClassifier, KNNClassifier
class BaggingClassifier:
"""Bagging集成分类器"""
def __init__(self, base_classifier, n_estimators: int = 10, random_state: int = 42):
self.base_classifier = base_classifier
self.n_estimators = n_estimators
self.random_state = random_state
self.estimators = []
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练Bagging分类器"""
np.random.seed(self.random_state)
n_samples = X.shape[0]
self.estimators = []
for i in range(self.n_estimators):
# Bootstrap采样
bootstrap_indices = np.random.choice(n_samples, size=n_samples, replace=True)
X_bootstrap = X[bootstrap_indices]
y_bootstrap = y[bootstrap_indices]
# 训练基分类器
if self.base_classifier == 'decision_tree':
estimator = DecisionTreeClassifier(max_depth=8)
elif self.base_classifier == 'naive_bayes':
estimator = NaiveBayesClassifier()
elif self.base_classifier == 'knn':
estimator = KNNClassifier(k=5)
estimator.fit(X_bootstrap, y_bootstrap)
self.estimators.append(estimator)
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
# 收集所有基分类器的预测
predictions = np.zeros((X.shape[0], self.n_estimators))
for i, estimator in enumerate(self.estimators):
predictions[:, i] = estimator.predict(X)
# 投票决定最终预测
final_predictions = []
for i in range(X.shape[0]):
votes = predictions[i, :]
prediction = max(set(votes), key=list(votes).count)
final_predictions.append(prediction)
return np.array(final_predictions)
class AdaBoostClassifier:
"""AdaBoost集成分类器"""
def __init__(self, n_estimators: int = 10, random_state: int = 42):
self.n_estimators = n_estimators
self.random_state = random_state
self.estimators = []
self.estimator_weights = []
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练AdaBoost分类器"""
np.random.seed(self.random_state)
n_samples = X.shape[0]
# 初始化样本权重
sample_weights = np.ones(n_samples) / n_samples
self.estimators = []
self.estimator_weights = []
for i in range(self.n_estimators):
# 根据样本权重采样
sample_indices = np.random.choice(
n_samples, size=n_samples, replace=True, p=sample_weights
)
X_weighted = X[sample_indices]
y_weighted = y[sample_indices]
# 训练弱分类器(决策树桩)
estimator = DecisionTreeClassifier(max_depth=1)
estimator.fit(X_weighted, y_weighted)
# 计算预测错误率
y_pred = estimator.predict(X)
error_mask = y_pred != y
error_rate = np.average(error_mask, weights=sample_weights)
# 如果错误率太高,停止
if error_rate >= 0.5:
break
# 计算分类器权重
alpha = 0.5 * np.log((1 - error_rate) / (error_rate + 1e-10))
# 更新样本权重
sample_weights *= np.exp(-alpha * y * y_pred)
sample_weights /= np.sum(sample_weights)
self.estimators.append(estimator)
self.estimator_weights.append(alpha)
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
n_samples = X.shape[0]
predictions = np.zeros(n_samples)
for estimator, weight in zip(self.estimators, self.estimator_weights):
y_pred = estimator.predict(X)
predictions += weight * y_pred
return np.sign(predictions)
class VotingClassifier:
"""投票集成分类器"""
def __init__(self, estimators: List[Any]):
self.estimators = estimators
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练所有分类器"""
for estimator in self.estimators:
estimator.fit(X, y)
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
predictions = np.zeros((X.shape[0], len(self.estimators)))
for i, estimator in enumerate(self.estimators):
predictions[:, i] = estimator.predict(X)
# 投票决定最终预测
final_predictions = []
for i in range(X.shape[0]):
votes = predictions[i, :]
prediction = max(set(votes), key=list(votes).count)
final_predictions.append(prediction)
return np.array(final_predictions)

2430
exper.ipynb Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,319 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
import time
from typing import Dict, List, Tuple
from utils import train_test_split, normalize_data, accuracy_score, cross_validation
from feature_extraction import PCA, FeatureSelector
from improved_bp import ImprovedBPNetwork, StandardBPNetwork
from classifiers import NaiveBayesClassifier, KNNClassifier, DecisionTreeClassifier
from ensemble import BaggingClassifier, AdaBoostClassifier, VotingClassifier
class ExperimentRunner:
"""实验运行器"""
def __init__(self):
self.results = {}
def generate_synthetic_data(self, n_samples: int = 1000, n_features: int = 20, n_classes: int = 3,
random_state: int = 42) -> Tuple[np.ndarray, np.ndarray]:
"""生成合成数据集"""
np.random.seed(random_state)
# 为每个类生成不同的均值和协方差
class_means = np.random.randn(n_classes, n_features) * 2
X = []
y = []
samples_per_class = n_samples // n_classes
for class_idx in range(n_classes):
# 生成该类的数据
class_data = np.random.randn(samples_per_class, n_features) + class_means[class_idx]
X.append(class_data)
y.extend([class_idx] * samples_per_class)
X = np.vstack(X)
y = np.array(y)
# 添加噪声特征
noise_features = np.random.randn(len(X), n_features // 2)
X = np.hstack([X, noise_features])
return X, y
def run_bp_comparison(self, X: np.ndarray, y: np.ndarray, dataset_name: str):
"""运行BP算法比较实验"""
print(f"\n=== BP算法比较实验 - {dataset_name} ===")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
X_train_norm, X_test_norm = normalize_data(X_train, X_test)
# 改进的BP网络
print("训练改进的BP网络...")
start_time = time.time()
improved_bp = ImprovedBPNetwork(hidden_layers=[10, 5], learning_rate=0.01, max_epochs=500)
improved_bp.fit(X_train_norm, y_train)
improved_train_time = time.time() - start_time
y_pred_improved = improved_bp.predict(X_test_norm)
improved_accuracy = accuracy_score(y_test, y_pred_improved)
# 标准BP网络
print("训练标准BP网络...")
start_time = time.time()
standard_bp = StandardBPNetwork(hidden_layers=[10, 5], learning_rate=0.01, max_epochs=500)
standard_bp.fit(X_train_norm, y_train)
standard_train_time = time.time() - start_time
y_pred_standard = standard_bp.predict(X_test_norm)
standard_accuracy = accuracy_score(y_test, y_pred_standard)
# 结果
print(f"改进BP - 准确率: {improved_accuracy:.4f}, 训练时间: {improved_train_time:.2f}s")
print(f"标准BP - 准确率: {standard_accuracy:.4f}, 训练时间: {standard_train_time:.2f}s")
# 绘制损失曲线
plt.figure(figsize=(10, 6))
plt.plot(improved_bp.loss_history, label='改进BP', alpha=0.8)
plt.plot(standard_bp.loss_history, label='标准BP', alpha=0.8)
plt.xlabel('训练轮次')
plt.ylabel('损失')
plt.title(f'BP算法损失曲线对比 - {dataset_name}')
plt.legend()
plt.grid(True)
plt.savefig(f'c:/Users/grtsi/ml-homework/bp_comparison_{dataset_name.lower()}.png')
plt.show()
return {
'improved_bp': {'accuracy': improved_accuracy, 'time': improved_train_time},
'standard_bp': {'accuracy': standard_accuracy, 'time': standard_train_time}
}
def run_feature_extraction_comparison(self, X: np.ndarray, y: np.ndarray, dataset_name: str):
"""运行特征提取比较实验"""
print(f"\n=== 特征提取比较实验 - {dataset_name} ===")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
X_train_norm, X_test_norm = normalize_data(X_train, X_test)
classifiers = {
'NaiveBayes': NaiveBayesClassifier(),
'KNN': KNNClassifier(k=5),
'DecisionTree': DecisionTreeClassifier(max_depth=8)
}
results = {}
for clf_name, clf in classifiers.items():
print(f"\n{clf_name} 分类器:")
# 无特征提取
clf_no_fe = type(clf)() if clf_name != 'KNN' else KNNClassifier(k=5)
clf_no_fe.fit(X_train_norm, y_train)
y_pred_no_fe = clf_no_fe.predict(X_test_norm)
acc_no_fe = accuracy_score(y_test, y_pred_no_fe)
# PCA特征提取
pca = PCA(n_components=min(10, X.shape[1] // 2))
X_train_pca = pca.fit_transform(X_train_norm)
X_test_pca = pca.transform(X_test_norm)
clf_pca = type(clf)() if clf_name != 'KNN' else KNNClassifier(k=5)
clf_pca.fit(X_train_pca, y_train)
y_pred_pca = clf_pca.predict(X_test_pca)
acc_pca = accuracy_score(y_test, y_pred_pca)
# 特征选择
feature_selector = FeatureSelector(k=min(10, X.shape[1] // 2))
X_train_fs = feature_selector.fit_transform(X_train_norm, y_train)
X_test_fs = feature_selector.transform(X_test_norm)
clf_fs = type(clf)() if clf_name != 'KNN' else KNNClassifier(k=5)
clf_fs.fit(X_train_fs, y_train)
y_pred_fs = clf_fs.predict(X_test_fs)
acc_fs = accuracy_score(y_test, y_pred_fs)
print(f" 无特征提取: {acc_no_fe:.4f}")
print(f" PCA特征提取: {acc_pca:.4f}")
print(f" 特征选择: {acc_fs:.4f}")
results[clf_name] = {
'no_feature_extraction': acc_no_fe,
'pca': acc_pca,
'feature_selection': acc_fs
}
return results
def run_classifier_comparison(self, X: np.ndarray, y: np.ndarray, dataset_name: str):
"""运行分类器比较实验"""
print(f"\n=== 分类器比较实验 - {dataset_name} ===")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
X_train_norm, X_test_norm = normalize_data(X_train, X_test)
classifiers = {
'NaiveBayes': NaiveBayesClassifier(),
'KNN': KNNClassifier(k=5),
'DecisionTree': DecisionTreeClassifier(max_depth=8)
}
results = {}
for clf_name, clf in classifiers.items():
print(f"\n{clf_name} 分类器:")
# 训练和测试
start_time = time.time()
clf.fit(X_train_norm, y_train)
train_time = time.time() - start_time
y_pred = clf.predict(X_test_norm)
accuracy = accuracy_score(y_test, y_pred)
# 交叉验证
cv_scores = cross_validation(type(clf)() if clf_name != 'KNN' else KNNClassifier(k=5),
X_train_norm, y_train, k=5)
print(f" 准确率: {accuracy:.4f}")
print(f" 训练时间: {train_time:.4f}s")
print(f" 交叉验证均值: {np.mean(cv_scores):.4f} ± {np.std(cv_scores):.4f}")
results[clf_name] = {
'accuracy': accuracy,
'train_time': train_time,
'cv_mean': np.mean(cv_scores),
'cv_std': np.std(cv_scores)
}
return results
def run_ensemble_comparison(self, X: np.ndarray, y: np.ndarray, dataset_name: str):
"""运行集成算法比较实验"""
print(f"\n=== 集成算法比较实验 - {dataset_name} ===")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)
X_train_norm, X_test_norm = normalize_data(X_train, X_test)
# 基础分类器
base_classifiers = {
'DecisionTree': DecisionTreeClassifier(max_depth=8),
'NaiveBayes': NaiveBayesClassifier(),
'KNN': KNNClassifier(k=5)
}
# 集成分类器
ensemble_classifiers = {
'Bagging_DT': BaggingClassifier('decision_tree', n_estimators=10),
'Voting': VotingClassifier([
DecisionTreeClassifier(max_depth=8),
NaiveBayesClassifier(),
KNNClassifier(k=5)
])
}
results = {}
# 测试基础分类器
print("基础分类器:")
for clf_name, clf in base_classifiers.items():
start_time = time.time()
clf.fit(X_train_norm, y_train)
train_time = time.time() - start_time
y_pred = clf.predict(X_test_norm)
accuracy = accuracy_score(y_test, y_pred)
print(f" {clf_name}: {accuracy:.4f} (训练时间: {train_time:.4f}s)")
results[clf_name] = {'accuracy': accuracy, 'train_time': train_time}
# 测试集成分类器
print("\n集成分类器:")
for clf_name, clf in ensemble_classifiers.items():
start_time = time.time()
clf.fit(X_train_norm, y_train)
train_time = time.time() - start_time
y_pred = clf.predict(X_test_norm)
accuracy = accuracy_score(y_test, y_pred)
print(f" {clf_name}: {accuracy:.4f} (训练时间: {train_time:.4f}s)")
results[clf_name] = {'accuracy': accuracy, 'train_time': train_time}
return results
def run_all_experiments(self):
"""运行所有实验"""
print("开始机器学习算法比较实验...")
# 生成两个不同的数据集
print("生成数据集...")
X1, y1 = self.generate_synthetic_data(n_samples=800, n_features=20, n_classes=3, random_state=42)
X2, y2 = self.generate_synthetic_data(n_samples=1000, n_features=25, n_classes=4, random_state=123)
datasets = [
(X1, y1, "Dataset1"),
(X2, y2, "Dataset2")
]
all_results = {}
for X, y, dataset_name in datasets:
print(f"\n{'='*50}")
print(f"处理数据集: {dataset_name}")
print(f"样本数: {X.shape[0]}, 特征数: {X.shape[1]}, 类别数: {len(np.unique(y))}")
# 运行各种实验
bp_results = self.run_bp_comparison(X, y, dataset_name)
fe_results = self.run_feature_extraction_comparison(X, y, dataset_name)
clf_results = self.run_classifier_comparison(X, y, dataset_name)
ensemble_results = self.run_ensemble_comparison(X, y, dataset_name)
all_results[dataset_name] = {
'bp_comparison': bp_results,
'feature_extraction': fe_results,
'classifier_comparison': clf_results,
'ensemble_comparison': ensemble_results
}
# 生成总结报告
self.generate_summary_report(all_results)
return all_results
def generate_summary_report(self, results: Dict):
"""生成总结报告"""
print(f"\n{'='*60}")
print("实验总结报告")
print(f"{'='*60}")
for dataset_name, dataset_results in results.items():
print(f"\n{dataset_name} 结果总结:")
print("-" * 40)
# BP算法比较
bp_results = dataset_results['bp_comparison']
print(f"BP算法比较:")
print(f" 改进BP: 准确率 {bp_results['improved_bp']['accuracy']:.4f}, 时间 {bp_results['improved_bp']['time']:.2f}s")
print(f" 标准BP: 准确率 {bp_results['standard_bp']['accuracy']:.4f}, 时间 {bp_results['standard_bp']['time']:.2f}s")
# 特征提取比较
fe_results = dataset_results['feature_extraction']
print(f"\n特征提取效果 (最佳结果):")
for clf_name, clf_results in fe_results.items():
best_method = max(clf_results, key=clf_results.get)
best_acc = clf_results[best_method]
print(f" {clf_name}: {best_method} ({best_acc:.4f})")
# 集成算法比较
ensemble_results = dataset_results['ensemble_comparison']
print(f"\n分类器性能排名:")
sorted_classifiers = sorted(ensemble_results.items(),
key=lambda x: x[1]['accuracy'], reverse=True)
for i, (clf_name, clf_result) in enumerate(sorted_classifiers[:5]):
print(f" {i+1}. {clf_name}: {clf_result['accuracy']:.4f}")
if __name__ == "__main__":
runner = ExperimentRunner()
results = runner.run_all_experiments()

View File

@ -1,96 +0,0 @@
import numpy as np
from typing import Tuple
class PCA:
"""主成分分析"""
def __init__(self, n_components: int):
self.n_components = n_components
self.components_ = None
self.mean_ = None
self.explained_variance_ratio_ = None
def fit(self, X: np.ndarray) -> 'PCA':
"""训练PCA模型"""
self.mean_ = np.mean(X, axis=0)
X_centered = X - self.mean_
# 计算协方差矩阵
cov_matrix = np.cov(X_centered, rowvar=False)
# 计算特征值和特征向量
eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)
# 按特征值降序排列
idx = np.argsort(eigenvalues)[::-1]
eigenvalues = eigenvalues[idx]
eigenvectors = eigenvectors[:, idx]
# 选择前n_components个主成分
self.components_ = eigenvectors[:, :self.n_components].T
self.explained_variance_ratio_ = eigenvalues[:self.n_components] / np.sum(eigenvalues)
return self
def transform(self, X: np.ndarray) -> np.ndarray:
"""应用PCA变换"""
X_centered = X - self.mean_
return np.dot(X_centered, self.components_.T)
def fit_transform(self, X: np.ndarray) -> np.ndarray:
"""训练并变换"""
return self.fit(X).transform(X)
class FeatureSelector:
"""基于信息增益的特征选择"""
def __init__(self, k: int):
self.k = k
self.selected_features_ = None
def _entropy(self, y: np.ndarray) -> float:
"""计算熵"""
_, counts = np.unique(y, return_counts=True)
probabilities = counts / len(y)
return -np.sum(probabilities * np.log2(probabilities + 1e-10))
def _information_gain(self, X_feature: np.ndarray, y: np.ndarray) -> float:
"""计算信息增益"""
# 对连续特征进行离散化
if len(np.unique(X_feature)) > 10:
bins = np.linspace(np.min(X_feature), np.max(X_feature), 11)
X_feature = np.digitize(X_feature, bins)
total_entropy = self._entropy(y)
values, counts = np.unique(X_feature, return_counts=True)
weighted_entropy = 0
for value, count in zip(values, counts):
subset_y = y[X_feature == value]
weighted_entropy += (count / len(y)) * self._entropy(subset_y)
return total_entropy - weighted_entropy
def fit(self, X: np.ndarray, y: np.ndarray) -> 'FeatureSelector':
"""训练特征选择器"""
n_features = X.shape[1]
feature_scores = []
for i in range(n_features):
score = self._information_gain(X[:, i], y)
feature_scores.append((i, score))
# 按信息增益排序
feature_scores.sort(key=lambda x: x[1], reverse=True)
self.selected_features_ = [idx for idx, _ in feature_scores[:self.k]]
return self
def transform(self, X: np.ndarray) -> np.ndarray:
"""应用特征选择"""
return X[:, self.selected_features_]
def fit_transform(self, X: np.ndarray, y: np.ndarray) -> np.ndarray:
"""训练并变换"""
return self.fit(X, y).transform(X)

View File

@ -1,248 +0,0 @@
import numpy as np
from typing import List, Tuple
class ImprovedBPNetwork:
"""改进的BP神经网络支持动态学习率调整"""
def __init__(self, hidden_layers: List[int], learning_rate: float = 0.01,
max_epochs: int = 1000, tolerance: float = 1e-6):
self.hidden_layers = hidden_layers
self.initial_lr = learning_rate
self.learning_rate = learning_rate
self.max_epochs = max_epochs
self.tolerance = tolerance
self.weights = []
self.biases = []
self.loss_history = []
def _sigmoid(self, x: np.ndarray) -> np.ndarray:
"""Sigmoid激活函数"""
x = np.clip(x, -500, 500) # 防止溢出
return 1 / (1 + np.exp(-x))
def _sigmoid_derivative(self, x: np.ndarray) -> np.ndarray:
"""Sigmoid函数的导数"""
s = self._sigmoid(x)
return s * (1 - s)
def _initialize_weights(self, input_size: int, output_size: int):
"""初始化权重和偏置"""
self.weights = []
self.biases = []
# 构建网络结构
layers = [input_size] + self.hidden_layers + [output_size]
# Xavier初始化
for i in range(len(layers) - 1):
w = np.random.normal(0, np.sqrt(2.0 / (layers[i] + layers[i+1])),
(layers[i], layers[i+1]))
b = np.zeros((1, layers[i+1]))
self.weights.append(w)
self.biases.append(b)
def _forward_pass(self, X: np.ndarray) -> List[np.ndarray]:
"""前向传播"""
activations = [X]
for i in range(len(self.weights)):
z = np.dot(activations[-1], self.weights[i]) + self.biases[i]
a = self._sigmoid(z)
activations.append(a)
return activations
def _backward_pass(self, X: np.ndarray, y: np.ndarray, activations: List[np.ndarray]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""反向传播"""
m = X.shape[0]
dw = [np.zeros_like(w) for w in self.weights]
db = [np.zeros_like(b) for b in self.biases]
# 输出层误差
delta = activations[-1] - y
# 从输出层向输入层反向传播
for i in range(len(self.weights) - 1, -1, -1):
dw[i] = np.dot(activations[i].T, delta) / m
db[i] = np.mean(delta, axis=0, keepdims=True)
if i > 0:
delta = np.dot(delta, self.weights[i].T) * self._sigmoid_derivative(
np.dot(activations[i], self.weights[i]) + self.biases[i])
return dw, db
def _adaptive_learning_rate(self, epoch: int, current_loss: float, prev_loss: float):
"""动态调整学习率"""
if epoch > 0:
if current_loss > prev_loss:
# 损失增加,降低学习率
self.learning_rate *= 0.9
elif (prev_loss - current_loss) / prev_loss < 0.001:
# 损失下降缓慢,增加学习率
self.learning_rate *= 1.05
# 限制学习率范围
self.learning_rate = np.clip(self.learning_rate,
self.initial_lr * 0.01,
self.initial_lr * 10)
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练神经网络"""
# 处理标签
if len(y.shape) == 1:
y_encoded = np.zeros((len(y), len(np.unique(y))))
for i, label in enumerate(np.unique(y)):
y_encoded[y == label, i] = 1
y = y_encoded
self._initialize_weights(X.shape[1], y.shape[1])
prev_loss = float('inf')
for epoch in range(self.max_epochs):
# 前向传播
activations = self._forward_pass(X)
# 计算损失
loss = np.mean((activations[-1] - y) ** 2)
self.loss_history.append(loss)
# 动态调整学习率
self._adaptive_learning_rate(epoch, loss, prev_loss)
# 反向传播
dw, db = self._backward_pass(X, y, activations)
# 更新权重和偏置
for i in range(len(self.weights)):
self.weights[i] -= self.learning_rate * dw[i]
self.biases[i] -= self.learning_rate * db[i]
# 检查收敛
if abs(prev_loss - loss) < self.tolerance:
print(f"训练在第{epoch+1}轮收敛")
break
prev_loss = loss
if epoch % 100 == 0:
print(f"Epoch {epoch}, Loss: {loss:.6f}, LR: {self.learning_rate:.6f}")
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
activations = self._forward_pass(X)
return np.argmax(activations[-1], axis=1)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""预测概率"""
activations = self._forward_pass(X)
return activations[-1]
class StandardBPNetwork:
"""标准BP神经网络固定学习率"""
def __init__(self, hidden_layers: List[int], learning_rate: float = 0.01,
max_epochs: int = 1000, tolerance: float = 1e-6):
self.hidden_layers = hidden_layers
self.learning_rate = learning_rate
self.max_epochs = max_epochs
self.tolerance = tolerance
self.weights = []
self.biases = []
self.loss_history = []
def _sigmoid(self, x: np.ndarray) -> np.ndarray:
"""Sigmoid激活函数"""
x = np.clip(x, -500, 500)
return 1 / (1 + np.exp(-x))
def _sigmoid_derivative(self, x: np.ndarray) -> np.ndarray:
"""Sigmoid函数的导数"""
s = self._sigmoid(x)
return s * (1 - s)
def _initialize_weights(self, input_size: int, output_size: int):
"""初始化权重和偏置"""
self.weights = []
self.biases = []
layers = [input_size] + self.hidden_layers + [output_size]
for i in range(len(layers) - 1):
w = np.random.normal(0, np.sqrt(2.0 / (layers[i] + layers[i+1])),
(layers[i], layers[i+1]))
b = np.zeros((1, layers[i+1]))
self.weights.append(w)
self.biases.append(b)
def _forward_pass(self, X: np.ndarray) -> List[np.ndarray]:
"""前向传播"""
activations = [X]
for i in range(len(self.weights)):
z = np.dot(activations[-1], self.weights[i]) + self.biases[i]
a = self._sigmoid(z)
activations.append(a)
return activations
def _backward_pass(self, X: np.ndarray, y: np.ndarray, activations: List[np.ndarray]) -> Tuple[List[np.ndarray], List[np.ndarray]]:
"""反向传播"""
m = X.shape[0]
dw = [np.zeros_like(w) for w in self.weights]
db = [np.zeros_like(b) for b in self.biases]
delta = activations[-1] - y
for i in range(len(self.weights) - 1, -1, -1):
dw[i] = np.dot(activations[i].T, delta) / m
db[i] = np.mean(delta, axis=0, keepdims=True)
if i > 0:
delta = np.dot(delta, self.weights[i].T) * self._sigmoid_derivative(
np.dot(activations[i], self.weights[i]) + self.biases[i])
return dw, db
def fit(self, X: np.ndarray, y: np.ndarray):
"""训练神经网络"""
if len(y.shape) == 1:
y_encoded = np.zeros((len(y), len(np.unique(y))))
for i, label in enumerate(np.unique(y)):
y_encoded[y == label, i] = 1
y = y_encoded
self._initialize_weights(X.shape[1], y.shape[1])
prev_loss = float('inf')
for epoch in range(self.max_epochs):
activations = self._forward_pass(X)
loss = np.mean((activations[-1] - y) ** 2)
self.loss_history.append(loss)
dw, db = self._backward_pass(X, y, activations)
for i in range(len(self.weights)):
self.weights[i] -= self.learning_rate * dw[i]
self.biases[i] -= self.learning_rate * db[i]
if abs(prev_loss - loss) < self.tolerance:
print(f"标准BP训练在第{epoch+1}轮收敛")
break
prev_loss = loss
if epoch % 100 == 0:
print(f"Standard BP Epoch {epoch}, Loss: {loss:.6f}")
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
activations = self._forward_pass(X)
return np.argmax(activations[-1], axis=1)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""预测概率"""
activations = self._forward_pass(X)
return activations[-1]

File diff suppressed because one or more lines are too long

29
main.py
View File

@ -1,29 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
from experiments import ExperimentRunner
def main():
"""主函数"""
print("机器学习算法实现与比较系统")
print("="*50)
print("本系统实现了以下内容:")
print("1. 改进的BP神经网络 vs 标准BP网络")
print("2. 特征提取(PCA, 特征选择)对分类性能的影响")
print("3. 多种分类算法比较(朴素贝叶斯, KNN, 决策树)")
print("4. 集成学习算法(Bagging, Voting)")
print("5. 所有算法均为自主实现未使用任何ML库")
print("="*50)
# 设置matplotlib中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 运行实验
runner = ExperimentRunner()
results = runner.run_all_experiments()
print("\n实验完成!结果已保存在实验报告中。")
print("图表已保存到本地文件。")
if __name__ == "__main__":
main()

View File

@ -1,72 +0,0 @@
import numpy as np
import pandas as pd
from typing import Tuple, List
import math
def load_data(filepath: str) -> Tuple[np.ndarray, np.ndarray]:
"""加载数据集"""
data = pd.read_csv(filepath)
X = data.iloc[:, :-1].values
y = data.iloc[:, -1].values
return X, y
def train_test_split(X: np.ndarray, y: np.ndarray, test_size: float = 0.3, random_state: int = 42) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""数据集划分"""
np.random.seed(random_state)
n_samples = X.shape[0]
n_test = int(n_samples * test_size)
indices = np.random.permutation(n_samples)
test_indices = indices[:n_test]
train_indices = indices[n_test:]
return X[train_indices], X[test_indices], y[train_indices], y[test_indices]
def normalize_data(X_train: np.ndarray, X_test: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""数据标准化"""
mean = np.mean(X_train, axis=0)
std = np.std(X_train, axis=0)
std[std == 0] = 1 # 避免除零
X_train_norm = (X_train - mean) / std
X_test_norm = (X_test - mean) / std
return X_train_norm, X_test_norm
def accuracy_score(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""计算准确率"""
return np.mean(y_true == y_pred)
def confusion_matrix(y_true: np.ndarray, y_pred: np.ndarray) -> np.ndarray:
"""计算混淆矩阵"""
classes = np.unique(np.concatenate([y_true, y_pred]))
n_classes = len(classes)
matrix = np.zeros((n_classes, n_classes), dtype=int)
for i, true_class in enumerate(classes):
for j, pred_class in enumerate(classes):
matrix[i, j] = np.sum((y_true == true_class) & (y_pred == pred_class))
return matrix
def cross_validation(classifier, X: np.ndarray, y: np.ndarray, k: int = 5) -> List[float]:
"""K折交叉验证"""
n_samples = X.shape[0]
fold_size = n_samples // k
scores = []
for i in range(k):
start_idx = i * fold_size
end_idx = start_idx + fold_size if i < k - 1 else n_samples
test_indices = np.arange(start_idx, end_idx)
train_indices = np.concatenate([np.arange(0, start_idx), np.arange(end_idx, n_samples)])
X_train_fold, X_test_fold = X[train_indices], X[test_indices]
y_train_fold, y_test_fold = y[train_indices], y[test_indices]
classifier.fit(X_train_fold, y_train_fold)
y_pred = classifier.predict(X_test_fold)
scores.append(accuracy_score(y_test_fold, y_pred))
return scores