K-近邻算法概述

Yu Ching San / 2023-06-18


引言 #

本文分享第一个机器学习算法:K-近邻算法(K-Nearest Neighbor, KNN),最初由Cover和Hart于1968年提出,是最简单的机器学习算法之一。本文将从以下几个方面对K-近邻算法进行分享:首先,我将分享K-近邻算法的基本理论概念;其次,本文将使用Python实现一个实战案例;最后,我将分享一些K-近邻算法常见的面试问题。

算法概述 #

K-近邻算法是通过测量不同特征值之间的距离方法进行分类。它的工作原理是:存在一个样本数据集合,也称为训练样本集,并且样本集中每个数据都存在标签,即已知样本集中每一数据与所属分类的对应关系。当输入没有标签的新数据后,将新数据的每个特征与样本集中数据对应的特征进行比较,然后算法提取样本集中特征最相似数据(最近邻)的分类标签。一般来说,我们只选择样本数据集中前\(k\)个最相似的数据,这就是K-近邻算法中的\(k\)出处,通常是\(k\)不大于20的整数。最后,选择\(k\)个最相似数据中出现次数最多的分类,作为新数据的分类。

K-近邻算法预测分类流程如下:

  1. 计算已知类别数据集中的点与当前点之间的距离;
  2. 按照距离递增次序排序;
  3. 选取与当前点距离最小的\(k\)个点;
  4. 确定前\(k\)个点所在类别的出现频率;
  5. 返回前\(k\)个点出现频率最高的类别作为当前点的预测分类。

其中,计算距离一般使用欧氏距离公式:

$$ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_{i=1}^{n} (x_i - y_i)^2} $$

K-近邻算法优缺点如下:

优点:

  • 简单易懂: K-近邻算法是一种直观、简单的算法,容易理解和实现;
  • 无需训练: K-近邻算法不需要训练,它可以直接利用已有的数据进行推断,因此对新数据的适应性较好;
  • 对异常值不敏感: 由于K-近邻算法是基于局部信息进行推断,它对于异常值的影响相对较小,不易受异常样本的干扰。

缺点:

  • 计算开销较大: 在进行预测时,K-近邻算法需要计算新样本与所有训练样本之间的距离,因此在处理大规模数据集时,计算开销较高,对计算资源要求较高;
  • 特征维度问题: 随着特征维度的增加,计算样本之间的距离会变得更加困难,这一问题被称为“唯独灾难”。在高维空间中,样本间的距离往往趋于相等或相差不大,导致K-近邻算法的性能下降;
  • 对不平衡数据集敏感: 当训练数据集中的类别不平衡时,样本较多的类别将对预测结果产生较大的影响,从而导致偏差。这可能会导致K-近邻算法在处理不平衡数据集时的性能下降。

注意: K-近邻算法中的\(k\)值是一个超参数,需要根据具体问题和数据进行调优。选择不合适的\(k\)值可能导致欠拟合过拟合问题。

实战案例 #

本文所使用的是一个改进约会网站配对效果的案例,原始案例来自于《机器学习实战》 一书,另外这里推荐一个写的还不错的书中项目对应的源码地址

案例概述 #

我的朋友海伦一直使用在线约会网站寻找适合自己的约会对象。尽管约会网站会推荐不同的人选,但她并不是喜欢每一个人。经过一番总结,她发现曾交往过三种类型的人:

  • 不喜欢的人
  • 魅力一般的人
  • 极具魅力的人

尽管发现了上述规律,但海伦依然无法将约会网站推荐的匹配对象归入恰当的分类。她觉得可以在周一到周五约会那些魅力一般的人,而周末则更喜欢与那些极具魅力的人为伴。海伦希望我们的分类软件可以更好地帮助她将匹配对象划分到确切的分类中。此外海伦还收集了一些约会网站未曾记录的数据信息,她认为这些数据更有助于匹配对象的归类。

这里我们就使用K-近邻算法来实现这一任务,下面是基本流程:

  1. 收集数据:提供文本文件;
  2. 准备数据:使用Python解析文本文件;
  3. 分析数据:使用Matplotlib画二维扩散图;
  4. 测试算法:使用海伦提供的部分数据作为测试样例;
  5. 使用算法:产生简单的命令行程序,然后海伦可以输入一些特征数据以判断对方是否为自己喜欢的类型。

注意: 测试样本和非测试样本的区别在于:测试样本是已经完成分类的数据,如果预测分类与实际类别不同,则标记为一个错误。

准备数据 #

海伦收集约会数据已经有了一段时间,她把这些数据存放在文本文件中,每个样本数据占据一行,总共有1000行。海伦的样本主要包含以下3种特征:

  • 每年获得的飞行常客里程数;
  • 玩视频游戏所耗时间百分比;
  • 每周消费的冰淇淋公升数。

在将上述特征数据输入到分类器之前,必须将待处理数据的格式改变为分类器可以接受的格式。下面是使用Python实现的数据读取代码:

def file2matrix(filename):
    """Read data from file and return a matrix and a vector of labels."""
    fr = open(filename, "r", encoding="utf-8")
    array_of_lines = fr.readlines()
    number_of_lines = len(array_of_lines)
    return_mat = np.zeros((number_of_lines, 3))
    class_label_vector = []

    for index, line in enumerate(array_of_lines):
        line = line.strip()
        list_from_line = line.split("\t")
        return_mat[index, :] = list_from_line[:3]
        
        if list_from_line[-1] == "didntLike":
            class_label_vector.append(1)
        elif list_from_line[-1] == "smallDoses":
            class_label_vector.append(2)
        elif list_from_line[-1] == "largeDoses":
            class_label_vector.append(3)

    return return_mat, class_label_vector

分析数据 #

我们可以通过使用Matplotlib绘制原始数据的散点图,初步分析数据的特征。下面是对应的Python代码:

def show_data(dating_data_mat, dating_labels):
    """Show data in a scatter plot."""
    font = FontProperties(fname=r"c:\windows\fonts\simsun.ttc", size=14)
    fig, axs = plt.subplots(nrows=2, ncols=2, sharex=False, sharey=False, figsize=(13, 8))
    
    labels_colors = []
    for i in dating_labels:
        if i == 1:
            labels_colors.append("black")
        elif i == 2:
            labels_colors.append("orange")
        elif i == 3:
            labels_colors.append("red")
    # 画出散点图,以第一列和第二列数据为横纵坐标       
    axs[0][0].scatter(x=dating_data_mat[:, 0], y=dating_data_mat[:, 1], color=labels_colors, s=15, alpha=.5)
    axs0_title_text = axs[0][0].set_title(u"每年获得的飞行常客里程数与玩视频游戏所消耗时间占比", fontproperties=font)
    axs0_xlabel_text = axs[0][0].set_xlabel(u"每年获得的飞行常客里程数", fontproperties=font)
    axs0_ylabel_text = axs[0][0].set_ylabel(u"玩视频游戏所消耗时间占比", fontproperties=font)
    plt.setp(axs0_title_text, size=9, weight="bold", color="red")
    plt.setp(axs0_xlabel_text, size=7, weight="bold", color="black")
    plt.setp(axs0_ylabel_text, size=7, weight="bold", color="black")
    # 画出散点图,以第一列和第三列数据为横纵坐标
    axs[0][1].scatter(x=dating_data_mat[:, 0], y=dating_data_mat[:, 2], color=labels_colors, s=15, alpha=.5)
    axs1_title_text = axs[0][1].set_title(u"每年获得的飞行常客里程数与每周消费的冰淇淋公升数", fontproperties=font)
    axs1_xlabel_text = axs[0][1].set_xlabel(u"每年获得的飞行常客里程数", fontproperties=font)
    axs1_ylabel_text = axs[0][1].set_ylabel(u"每周消费的冰淇淋公升数", fontproperties=font)
    plt.setp(axs1_title_text, size=9, weight="bold", color="red")
    plt.setp(axs1_xlabel_text, size=7, weight="bold", color="black")
    plt.setp(axs1_ylabel_text, size=7, weight="bold", color="black")
    # 画出散点图,以第二列和第三列数据为横纵坐标
    axs[1][0].scatter(x=dating_data_mat[:, 1], y=dating_data_mat[:, 2], color=labels_colors, s=15, alpha=.5)
    axs2_title_text = axs[1][0].set_title(u"玩视频游戏所消耗时间占比与每周消费的冰淇淋公升数", fontproperties=font)
    axs2_xlabel_text = axs[1][0].set_xlabel(u"玩视频游戏所消耗时间占比", fontproperties=font)
    axs2_ylabel_text = axs[1][0].set_ylabel(u"每周消费的冰淇淋公升数", fontproperties=font)
    plt.setp(axs2_title_text, size=9, weight="bold", color="red")
    plt.setp(axs2_xlabel_text, size=7, weight="bold", color="black")
    plt.setp(axs2_ylabel_text, size=7, weight="bold", color="black")
    # 设置图例
    didnt_like = mlines.Line2D([], [], color="black", marker=".", markersize=6, label="didntLike")
    small_doses = mlines.Line2D([], [], color="orange", marker=".", markersize=6, label="smallDoses")
    large_doses = mlines.Line2D([], [], color="red", marker=".", markersize=6, label="largeDoses")
    # 增加图例
    axs[0][0].legend(handles=[didnt_like, small_doses, large_doses])
    axs[0][1].legend(handles=[didnt_like, small_doses, large_doses])
    axs[1][0].legend(handles=[didnt_like, small_doses, large_doses])
    # 增加子图间的间隔
    plt.subplots_adjust(hspace=.3)

    plt.show()

绘制结果如下图所示:

image.png

由上图可见,每年赢得的飞行常客里程数与玩视频游戏所占百分比的约会数据这两个特征相对更容易区分数据点所属的类别。

数据归一化 #

数据中给定的特征的量纲并不统一,会对计算结果产生影响。因此,在处理这种不同取值范围的特征值时,我们通常采用的方法是将数值归一化,如将取值范围处理为0到1或者-1到1之间。下面的公式可以将任意取值范围的特征值转化为0到1区间内的值: $$ newValue = \frac{{oldValue - {min}}}{{{max} - {min}}} $$ 下面是Python实现数据归一化的代码:

def autoNorm(dataSet):
    """Normalize data set."""
    min_vals = dataSet.min(0) # 取每一列的最小值
    max_vals = dataSet.max(0) # 取每一列的最大值
    ranges = max_vals - min_vals # 取每一列的范围
    norm_data_set = np.zeros(np.shape(dataSet)) # 初始化归一化数据集
    m = dataSet.shape[0] # 取数据集的行数
    norm_data_set = dataSet - np.tile(min_vals, (m, 1)) # 每一列的数据减去最小值
    norm_data_set = norm_data_set / np.tile(ranges, (m, 1)) # 每一列的数据除以范围

    return norm_data_set, ranges, min_vals

分类器实现 #

基于K-近邻算法的原理,用Python实现的分类器代码如下:

def classify0(in_x, data_set, labels, k):
    """Classify data."""
    data_set_size = data_set.shape[0] # 取数据集的行数
    diff_mat = np.tile(in_x, (data_set_size, 1)) - data_set # 每一行的数据减去输入数据
    sq_diff_mat = diff_mat ** 2 # 每一行的数据平方
    sq_distances = sq_diff_mat.sum(axis=1) # 每一行的数据平方和
    distances = sq_distances ** .5 # 每一行的数据开方
    sorted_dist_indicies = distances.argsort() # 每一行的数据从小到大排序
    class_count = {} # 初始化分类字典
    for i in range(k):
        vote_i_label = labels[sorted_dist_indicies[i]] # 取前k个数据的标签
        class_count[vote_i_label] = class_count.get(vote_i_label, 0) + 1 # 统计标签出现的次数
    sorted_class_count = sorted(class_count.items(), key=operator.itemgetter(1), reverse=True) # 标签出现的次数从大到小排序

    return sorted_class_count[0][0]

测试算法 #

对于分类器来说,错误率就是分类器给出错误结果的次数除以测试数据的总数,完美分类器的错误率为0,而错误率为1.0的分类器不会给出任何正确的分类结果。代码里我们定义一个计数器变量,每次分类器错误地分类数据,计数器就加1,程序执行完成之后计数器的结果除以数据点总数即是错误率。具体的Python实现代码如下所示:

def datingClassTest():
    """
    Test dating data.
    """
    dating_data_mat, dating_labels = file2matrix("datingTestSet.txt")
    ho_ratio = .1 # 取数据集的10%作为测试集
    norm_data_set, ranges, min_vals = autoNorm(dating_data_mat) # 归一化数据集
    m = norm_data_set.shape[0] # 取数据集的行数
    num_test_vecs = int(m * ho_ratio) # 取测试集的行数
    error_count = 0.0 # 初始化错误率

    for i in range(num_test_vecs):
        classifier_result = classify0(norm_data_set[i, :], norm_data_set[num_test_vecs:m, :], dating_labels[num_test_vecs:m], 3)
        print("the classifier came back with: %d, the real answer is: %d" % (classifier_result, dating_labels[i]))
        if classifier_result != dating_labels[i]:
            error_count += 1.0
            
    print("the total error rate is: %f" % (error_count / float(num_test_vecs)))

执行分类器测试程序,我们将得到下面的输出结果:

the classifier came back with: 3, the real answer is: 3
the classifier came back with: 2, the real answer is: 2 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 3, the real answer is: 3 
the classifier came back with: 3, the real answer is: 3 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 3, the real answer is: 3 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 1, the real answer is: 1
...
the classifier came back with: 2, the real answer is: 2 
the classifier came back with: 1, the real answer is: 1 
the classifier came back with: 3, the real answer is: 1 
the total error rate is: 0.050000

使用算法 #

上面我们已经在数据上对分类器进行了测试,现在终于可以使用这个分类器为海伦来对人们分类。下面是使用Python实现的代码:

def classifyPerson():
    """Classify person."""
    result_list = ["not at all", "in small doses", "in large doses"]
    percent_tats = float(input("percentage of time spent playing video games?"))
    ff_miles = float(input("frequent flier miles earned per year?"))
    ice_cream = float(input("liters of ice cream consumed per year?"))
    dating_data_mat, dating_labels = file2matrix("datingTestSet.txt")
    norm_data_set, ranges, min_vals = autoNorm(dating_data_mat)
    in_arr = np.array([ff_miles, percent_tats, ice_cream])
    classifier_result = classify0((in_arr - min_vals) / ranges, norm_data_set, dating_labels, 3)
    print("You will probably like this person: ", result_list[classifier_result - 1])

sklearn实现 #

针对于上文中的案例,这里使用Python的sklearn库进行实现,具体代码如下所示:

from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np

# 读取数据
dating_data_mat = pd.read_table("datingTestSet.txt", header=None, names=["ff_miles", "percent_tats", "ice_cream", "labels"])
# 进行归一化
min_max_scaler = MinMaxScaler()
norm_data_set = min_max_scaler.fit_transform(dating_data_mat.iloc[:, :-1])
# 基于sklearn将标签转换为数字
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(dating_data_mat.iloc[:, -1])
# 划分训练集和测试集
train_data, test_data, train_labels, test_labels = train_test_split(norm_data_set, labels, test_size=.1, random_state=2020)

# 构建模型
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(train_data, train_labels)
# 测试模型
predict = knn.predict(test_data)

# 计算测试准确率
accuracy = accuracy_score(test_labels, predict)
print("accuracy: ", accuracy)

# 预测新数据
new_data = np.array([[26052, 1.441871, 0.805124]])
new_data = min_max_scaler.transform(new_data)
new_predict = knn.predict(new_data)
# 将标签转换为原始标签
new_predict = label_encoder.inverse_transform(new_predict)
print("You will probably like this person:", new_predict[0])

常见问题 #

  • 简述一下KNN算法的原理?(参考答案见上文)
  • KNN算法有哪些优点和缺点?(参考答案见上文)
  • KNN算法的三要素?

参考答案

K近邻(K-Nearest Neighbors,KNN)算法是一种基本的分类和回归算法。它的三个要素是:距离度量、K值的选择和分类决策规则。

(1)距离度量:KNN算法通过计算样本之间的距离来确定它们的相似性。常用的距离度量方法包括欧氏距离、曼哈顿距离、闵可夫斯基距离等。选择合适的距离度量方法对算法的性能影响很大。(2)K值的选择:K值代表选择最近邻的样本数量。在KNN算法中,为了确定一个未知样本的类别,它会找到距离最近的K个邻居样本,并根据这些邻居样本的类别进行投票或计算平均值来决定未知样本的类别。选择合适的K值是关键,较小的K值可能会导致模型过拟合,较大的K值可能会导致模型欠拟合。(3)分类决策规则:KNN算法的分类决策规则是基于邻居样本的类别进行的。对于分类任务,常用的决策规则是多数表决,即将K个邻居样本中出现最多次数的类别作为未知样本的类别。对于回归任务,通常采用平均值决策规则,即将K个邻居样本的输出值的平均值作为未知样本的输出值。

这三个要素是KNN算法的关键部分,它们共同决定了KNN算法的性能和表现。选择合适的距离度量方法、K值和分类决策规则对于获得准确的预测结果非常重要。

参考文献 #

[1] Cover T, Hart P. Nearest neighbor pattern classification[J]. IEEE transactions on information theory, 1967, 13(1): 21-27.

[2] Harrington, P. (2012). Machine Learning in Action. Shelter Islan d, NY: Manning Publications.

最后一次修改于 2023-07-23