微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

机器算法从入门到实战及面试

第一章、机器学习算法入门

img


一、什么是机器学习

机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。它是人工智能的核心,是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域,它主要使用归纳、综合而不是演绎。一种经常引用的英文定义是:

A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E。

 

img

 

机器学习强调三个关键词:算法、经验、性能,其处理过程如上图所示。在数据的基础上,通过算法构建出模型并对模型进行评估。评估的性能如果达到要求,就用该模型来测试其他的数据;如果达不到要求,就要调整算法来重新建立模型,再次进行评估。如此循环往复,最终获得满意的经验来处理其他的数据。机器学习技术和方法已经被成功应用到多个领域,比如个性推荐系统,金融反欺诈,语音识别,自然语言处理和机器翻译,模式识别,智能控制等。

二、基于大数据的机器学习

传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用。即以前的统计/机器学习依赖于数据抽样。但实际过程中样本往往很难做好随机,导致学习的模型不是很准确,在测试数据上的效果也可能不太好。

随着 HDFS(Hadoop distributed File System) 等分布式文件系统出现,存储海量数据已经成为可能。在全量数据上进行机器学习也成为了可能,这顺便也解决统计随机性的问题。然而,由于 MapReduce 自身的限制,使得使用 MapReduce 来实现分布式机器学习算法非常耗时和消耗磁盘IO。因为通常情况下机器学习算法参数学习的过程都是迭代计算的,即本次计算的结果要作为下一次迭代的输入,这个过程中,如果使用 MapReduce,我们只能把中间结果存储磁盘,然后在下一次计算的时候从新读取,这对于迭代 频发的算法显然是致命的性能瓶颈。

在大数据上进行机器学习,需要处理全量数据并进行大量的迭代计算,这要求机器学习平台具备强大的处理能力。Spark 立足于内存计算,天然的适应于迭代式计算。即便如此,对于普通开发者来说,实现一个分布式机器学习算法仍然是一件极具挑战的事情。

幸运的是,Spark提供了一个基于海量数据的机器学习库,它提供了常用机器学习算法的分布式实现,开发者只需要有 Spark 基础并且了解机器学习算法的原理,以及方法相关参数的含义,就可以轻松的通过调用相应的 API 来实现基于海量数据的机器学习过程。

其次,Spark-Shell的即席查询也是一个关键。算法工程师可以边写代码边运行,边看结果。spark提供的各种高效的工具正使得机器学习过程更加直观便捷。比如通过sample函数,可以非常方便的进行抽样。

当然,Spark发展到后面,拥有了实时批计算,批处理,算法库,sql、流计算等模块等,基本可以看做是全平台的系统。把机器学习作为一个模块加入到Spark中,也是大势所趋。

三、Spark 机器学习库MLLib

MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容

机器学习算法:常用的学习算法,如分类、回归、聚类和协同过滤;

特征处理:特征提取、转化、降维,和特征选择;

管道(Pipeline):用于构建、评估和调整机器学习管道的工具;

存储:保存和加载算法,模型和管道;

实用工具:线性代数,统计,数据处理等工具。

Spark 机器学习库从 1.2 版本以后被分为两个包:

spark.mllib包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。

spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。

使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。这种方式给我们提供了更灵活的方法,更符合机器学习过程的特点,也更容易从其他语言迁移。Spark官方推荐使用spark.ml。如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。开发者需要注意的是,从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。

Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:

 

 

img

第二章、机器学习算法代码实现

1、传统机器学习算法及代码实现

一、机器学习算法及代码实现–决策树

1、决策树

决策树算法的核心在于决策树的构建,每次选择让整体数据香农熵(描述数据的混乱程度)减小最多的特征,使用其特征值对数据进行划分,每次消耗一个特征,不断迭代分类,直到所有特征消耗完(选择剩下数据中出现次数最多的类别作为这堆数据的类别),或剩下的数据全为同一类别,不必继续划分,至此决策树构建完成,之后我们依照这颗决策树对新进数据进行分类

这里写图片描述

2、信息熵

一条信息的信息量大小和它的不确定性有直接的关系,要搞清楚一件非常非常不确定的事情,或者是我们一无所知的事情,需要了解大量信息==>信息量的度量就等于不确定性的多少 例子:猜世界杯冠军,假如一无所知,猜多少次?实际中每个队夺冠的几率不是相等的,如果我们对其有足够了解,是否猜中的概率会增大? 信息熵用比特(bit)来衡量信息的多少,变量的不确定性越大,熵也就越大。 公式:

这里写图片描述

3、决策树算法(ID3)

我们以一个例子来讲述决策树的算法(判断该用户是否买电脑)

这里写图片描述

每次选择信息获取量最大的特征对其进行划分 Gain(A) = Info(D) - Infor_A(D) (原来的信息熵减去用A之后的信息熵=》获取的信息量)

计算过程:

这里写图片描述

类似,Gain(income) = 0.029, Gain(student) = 0.151, Gain(credit_rating)=0.048 所以,选择age作为第一个根节点 分类结果:

这里写图片描述

算法注意点: 1)根节点开始,样本在同一个类则为树叶,标记类号 2)选择信息获取量最大的进行划分 3)属性为离散值连续则必须离散化 4)根据属性划分分支,分支的子节点不用再考虑该属性 停止条件 1)所有节点属于同一类 2)没有可划分的属性了: 以当中的大多数来确定类 3)属性下没节点:以父节点中的多少类作为类

4、其它决策树算法

C4.5、CART

5、对于过拟合的处理方法

先剪枝:一定深度不再分 后剪枝:先生成,后按规则减

6、优缺点

优点:直观、易理解、小规模数据有效 缺点:处理连续变量不好,值域不好划分 类别多时,错误增加快 可规模性一般,大量数据时复杂性大

7、算法实现
#-*- coding: utf-8 -*-
from sklearn.feature_extraction import DictVectorizer
import csv
from sklearn import preprocessing
from sklearn import tree
from sklearn.externals.six import StringIO

data = open('jueceshu.csv', 'rb')
reader = csv.reader(data)
headers = reader.next()
print headers

featureList = []  # 特征集
labelList = []  # 标签
for row in reader:
   # 最后一列是标签,构造标签
   labelList.append(row[len(row)-1])
   # 构造特征集
   rowDict = {}
   for i in range(1, len(row)-1):
       # header里面是属性名,用来作键值
       rowDict[headers[i]] = row[i]
   featureList.append(rowDict)

print featureList

vec = DictVectorizer()
# 将特征转化为向量
dummyX = vec.fit_transform(featureList).toarray()

print ('dummyX:'+str(dummyX))
# 输出向量中每一项的含义
print vec.get_feature_names()

print 'labelList:' + str(labelList)

# 将标签变成列向量
lb = preprocessing.LabelBinarizer()
dummyY = lb.fit_transform(labelList)
print 'dummyY:' + str(dummyY)

# 利用tree中的分类器来创建决策树
clf = tree.DecisionTreeClassifier(criterion='entropy')  # 用ID3的算法 信息熵
clf = clf.fit(dummyX, dummyY)
print 'clf:' + str(clf)

# 画决策树
with open('jueceshu.dot', 'w') as f:
   # 把feature_name返回
   f = tree.export_graphviz(clf,feature_names=vec.get_feature_names(), out_file=f)

oneRowX = dummyX[0, :]
print 'oneRowX:' + str(oneRowX)

# 构造新的情况,并预测
newRowX = oneRowX
newRowX[0] = 1
newRowX[2] = 0
print 'newRowX:' + str(newRowX)

# 用模型预测
predictedY = clf.predict(newRowX)
print 'predictedY:' + str(predictedY)

二、机器学习算法及代码实现–K邻近算法

1、K邻近算法

将标注好类别的训练样本映射到X(选取的特征数)维的坐标系之中,同样将测试样本映射到X维的坐标系之中,选取距离该测试样本欧氏距离(两点间距离公式)最近的k个训练样本,其中哪个训练样本类别占比最大,我们就认为它是该测试样本所属的类别。

这里写图片描述

2、算法步骤:
 1)为了判断未知实例的类别,以所有已知类别的实例作为参照
2)选择参数K
3)计算未知实例与所有已知实例的距离
4)选择最近K个已知实例
5)根据少数服从多数的投票法则(majority-voting),让未知实例归类为K个最邻近样本中最多数的类别
3、距离

Euclidean distance 定义 其他距离衡量:余弦值(cos), 相关度 (correlation), 曼哈顿距离 (Manhattan distance)

这里写图片描述

其他距离衡量:余弦值(cos), 相关度 (correlation), 曼哈顿距离 (Manhattan distance)

4、例子

这里写图片描述

求距G点最近的k点中哪一类点最多,就可以预测G点类型。

5、算法优缺点:

优点 1)简单 2)易于理解 3)容易实现 4)通过对K的选择可具备丢噪音数据的健壮性

缺点


    1)需要大量空间储存所有已知实例
    2)算法复杂度高(需要比较所有已知实例与要分类的实例)
    3) 当其样本分布不平衡时,比如其中一类样本过大(实例数量过多)占主导的时候,新的未知实例容易被归类为这个主导样本,因为这类样本实例的数量过大,但这个新的未知实例实际并木接近目标样本

 

6、 改进版本
  考虑距离,根据距离加上权重
比如: 1/d (d: 距离)
7、代码
# -*- coding: utf-8 -*-
from sklearn import neighbors
from sklearn import datasets
# 调用knn分类
knn = neighbors.KNeighborsClassifier()
# 导入数据集
iris = datasets.load_iris()

print iris

# 训练
knn.fit(iris.data, iris.target)

# 预测
predictedLabel = knn.predict([[0.1, 0.2, 0.3, 0.4]])
print 'predictedLabel:'
print predictedLabel

三、机器学习算法及代码实现–支持向量机

1、支持向量机

SVM希望通过N-1维的分隔超平面线性分开N维的数据,距离分隔超平面最近的点被叫做支持向量,我们利用SMO(SVM实现方法之一)最大化支持向量到分隔面的距离,这样当新样本点进来时,其被分类正确的概率也就更大。我们计算样本点到分隔超平面的函数间隔,如果函数间隔为正,则分类正确,函数间隔为负,则分类错误函数间隔的绝对值除以||w||就是几何间隔,几何间隔始终为正,可以理解为样本点到分隔超平面的几何距离。若数据不是线性可分的,那我们引入核函数的概念,从某个特征空间到另一个特征空间的映射是通过核函数来实现的,我们利用核函数将数据从低维空间映射到高维空间,低维空间的非线性问题在高维空间往往会成为线性问题,再利用N-1维分割超平面对数据分类

这里写图片描述

2、分类

线性可分、线性不可分

3、超平面公式(先考虑线性可分)

W*X+b=0 其中W={w1,w2,,,w3},为权重向量 下面用简单的二维向量讲解(思维导图)

这里写图片描述

4、寻找超平面

这里写图片描述

5、例子

这里写图片描述

6、线性不可分

映射到高维

这里写图片描述

算法思路(思维导图)

这里写图片描述

函数举例

这里写图片描述

7、代码
# -*- coding: utf-8 -*-
from sklearn import svm

# 数据
x = [[2, 0], [1, 1], [2, 3]]
# 标签
y = [0, 0, 1]
# 线性可分的svm分类器,用线性的核函数
clf = svm.SVC(kernel='linear')
# 训练
clf.fit(x, y)
print clf

# 获得支持向量
print clf.support_vectors_

# 获得支持向量点在原数据中的下标
print clf.support_

# 获得每个类支持向量的个数
print clf.n_support_

# 预测
print clf.predict([2, 0])
# -*- coding: utf-8 -*-
import numpy as np
import pylab as pl
from sklearn import svm

np.random.seed(0)  # 值固定,每次随机结果不变
# 2组20个二维的随机数,20个0,20个1的y (20,2)20行2列
X = np.r_[np.random.randn(20, 2) - [2, 2], np.random.randn(20, 2) + [2, 2]]
Y = [0] * 20 + [1] * 20

# 训练
clf = svm.SVC(kernel='linear')
clf.fit(X, Y)


w = clf.coef_[0]
a = -w[0] / w[1]
xx = np.linspace(-5, 5)
yy = a * xx - (clf.intercept_[0] / w[1])  # 点斜式 平分的线


b = clf.support_vectors_[0]
yy_down = a* xx +(b[1] - a*b[0])
b = clf.support_vectors_[-1]
yy_up = a* xx +(b[1] - a*b[0])  # 两条虚线

print "w: ", w
print "a: ", a
# print " xx: ", xx
# print " yy: ", yy
print "support_vectors_: ", clf.support_vectors_
print "clf.coef_: ", clf.coef_

# In scikit-learn coef_ attribute holds the vectors of the separating hyperplanes for linear models. It has shape (n_classes, n_features) if n_classes > 1 (multi-class one-vs-all) and (1, n_features) for binary classification.
#
# In this toy binary classification example, n_features == 2, hence w = coef_[0] is the vector orthogonal to the hyperplane (the hyperplane is fully defined by it + the intercept).
#
# To plot this hyperplane in the 2D case (any hyperplane of a 2D plane is a 1D line), we want to find a f as in y = f(x) = a.x + b. In this case a is the slope of the line and can be computed by a = -w[0] / w[1].




# plot the line, the points, and the nearest vectors to the plane
pl.plot(xx, yy, 'k-')
pl.plot(xx, yy_down, 'k--')
pl.plot(xx, yy_up, 'k--')

pl.scatter(clf.support_vectors_[:, 0], clf.support_vectors_[:, 1],
          s=80, facecolors='none')
pl.scatter(X[:, 0], X[:, 1], c=Y, cmap=pl.cm.Paired)

pl.axis('tight')
pl.show()
# -*- coding: utf-8 -*-
from __future__ import print_function

from time import time
import logging  # 打印程序进展的信息
import matplotlib.pyplot as plt

from sklearn.cross_validation import train_test_split
from sklearn.datasets import fetch_lfw_people
from sklearn.grid_search import gridsearchcv
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.decomposition import RandomizedPCA
from sklearn.svm import SVC


print(__doc__)

# 打印程序进展的信息
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')


###############################################################################
# 下载人脸数据集,并导入

lfw_people = fetch_lfw_people(min_faces_per_person=70, resize=0.4)

# 数据集多少,长宽多少
n_samples, h, w = lfw_people.images.shape

# x是特征向量的矩阵,获取矩阵列数,即纬度
X = lfw_people.data
n_features = X.shape[1]

# y是分类标签向量
y = lfw_people.target
# 类别里面有谁的名字
target_names = lfw_people.target_names
# 名字有多少行,即有多少人要区分
n_classes = target_names.shape[0]

# 打印
print("Total dataset size:")
print("n_samples: %d" % n_samples)
print("n_features: %d" % n_features)
print("n_classes: %d" % n_classes)


###############################################################################
# 将数据集划分为训练集和测试集,测试集占0.25
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.25)


###############################################################################
# PCA降维
n_components = 150  # 组成元素数量

print("Extracting the top %d eigenfaces from %d faces"
      % (n_components, X_train.shape[0]))
t0 = time()
# 建立PCA模型
pca = RandomizedPCA(n_components=n_components, whiten=True).fit(X_train)
print("done in %0.3fs" % (time() - t0))

# 提取特征脸
eigenfaces = pca.components_.reshape((n_components, h, w))

print("Projecting the input data on the eigenfaces orthonormal basis")
t0 = time()
# 将特征向量转化为低维矩阵
X_train_pca = pca.transform(X_train)
X_test_pca = pca.transform(X_test)
print("done in %0.3fs" % (time() - t0))


###############################################################################
# Train a SVM classification model

print("Fitting the classifier to the training set")
t0 = time()
# C错误惩罚权重 gamma 建立核函数的不同比例
param_grid = {'C': [1e3, 5e3, 1e4, 5e4, 1e5],
              'gamma': [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.1], }
# 选择核函数,建SVC,尝试运行,获得最好参数
clf = gridsearchcv(SVC(kernel='rbf', class_weight='auto'), param_grid)
# 训练
clf = clf.fit(X_train_pca, y_train)
print("done in %0.3fs" % (time() - t0))
print("Best estimator found by grid search:")
print(clf.best_estimator_)  # 输出最佳参数


###############################################################################
# Quantitative evaluation of the model quality on the test set

print("Predicting people's names on the test set")
t0 = time()
# 预测
y_pred = clf.predict(X_test_pca)
print("done in %0.3fs" % (time() - t0))

print(classification_report(y_test, y_pred, target_names=target_names))  # 与真实情况作对比求置信度
print(confusion_matrix(y_test, y_pred, labels=range(n_classes)))  # 对角线的为预测正确的,a预测为a


###############################################################################
# Qualitative evaluation of the predictions using matplotlib

def plot_gallery(images, titles, h, w, n_row=3, n_col=4):
    """Helper function to plot a gallery of portraits"""
    plt.figure(figsize=(1.8 * n_col, 2.4 * n_row))
    plt.subplots_adjust(bottom=0, left=.01, right=.99, top=.90, hspace=.35)
    for i in range(n_row * n_col):
        plt.subplot(n_row, n_col, i + 1)
        plt.imshow(images[i].reshape((h, w)), cmap=plt.cm.gray)
        plt.title(titles[i], size=12)
        plt.xticks(())
        plt.yticks(())


# plot the result of the prediction on a portion of the test set

def title(y_pred, y_test, target_names, i):
    pred_name = target_names[y_pred[i]].rsplit(' ', 1)[-1]
    true_name = target_names[y_test[i]].rsplit(' ', 1)[-1]
    return 'predicted: %s\ntrue:      %s' % (pred_name, true_name)

prediction_titles = [title(y_pred, y_test, target_names, i)
                     for i in range(y_pred.shape[0])]

plot_gallery(X_test, prediction_titles, h, w)  # 画出测试集和它的title

# plot the gallery of the most significative eigenfaces

eigenface_titles = ["eigenface %d" % i for i in range(eigenfaces.shape[0])]
plot_gallery(eigenfaces, eigenface_titles, h, w)  # 打印特征脸

plt.show()  # 显示

四、机器学习算法及代码实现–神经网络

1、神经网络

神经网络是一种运算模型,由大量的节点(或称神经元)之间相互联接构成。每个节点代表一种特定的输出函数,称为激励函数(activation function)。每两个节点间的连接都代表一个对于通过该连接信号的加权值,称之为权重,这相当于人工神经网络的记忆。网络的输出则依网络的连接方式,权重值和激励函数的不同而不同。而网络自身通常都是对自然界某种算法或者函数的逼近,也可能是对一种逻辑策略的表达。

这里写图片描述

2、多层向前神经网络

这里写图片描述

3、设计神经网络结构

这里写图片描述

4、反向回馈算法

这里写图片描述

这里写图片描述

5、实例

这里写图片描述

这里写图片描述

6、代码
import numpy as np

def tanh(x):  
    return np.tanh(x)

def tanh_deriv(x):  
    return 1.0 - np.tanh(x)*np.tanh(x)

def logistic(x):  
    return 1/(1 + np.exp(-x))

def logistic_derivative(x):  
    return logistic(x)*(1-logistic(x))



class NeuralNetwork:   
    def __init__(self, layers, activation='tanh'):  
        """  
        :param layers: A list containing the number of units in each layer.
        Should be at least two values  
        :param activation: The activation function to be used. Can be
        "logistic" or "tanh"  
        """  
        if activation == 'logistic':  
            self.activation = logistic  
            self.activation_deriv = logistic_derivative  
        elif activation == 'tanh':  
            self.activation = tanh  
            self.activation_deriv = tanh_deriv

        self.weights = []  
        for i in range(1, len(layers) - 1):  
            self.weights.append((2*np.random.random((layers[i - 1] + 1, layers[i] + 1))-1)*0.25)  
            self.weights.append((2*np.random.random((layers[i] + 1, layers[i + 1]))-1)*0.25)


    def fit(self, X, y, learning_rate=0.2, epochs=10000):         
        X = np.atleast_2d(X)         
        temp = np.ones([X.shape[0], X.shape[1]+1])         
        temp[:, 0:-1] = X  # adding the bias unit to the input layer         
        X = temp         
        y = np.array(y)

        for k in range(epochs):  
            i = np.random.randint(X.shape[0])  
            a = [X[i]]

            for l in range(len(self.weights)):  #going forward network, for each layer
                a.append(self.activation(np.dot(a[l], self.weights[l])))  #Computer the node value for each layer (O_i) using activation function
            error = y[i] - a[-1]  #Computer the error at the top layer
            deltas = [error * self.activation_deriv(a[-1])] #For output layer, Err calculation (delta is updated error)

            #Staring backprobagation
            for l in range(len(a) - 2, 0, -1): # we need to begin at the second to last layer 
                #Compute the updated error (i,e, deltas) for each node going from top layer to input layer 
                deltas.append(deltas[-1].dot(self.weights[l].T)*self.activation_deriv(a[l]))  
            deltas.reverse()  
            for i in range(len(self.weights)):  
                layer = np.atleast_2d(a[i])  
                delta = np.atleast_2d(deltas[i])  
                self.weights[i] += learning_rate * layer.T.dot(delta)


    def predict(self, x):         
        x = np.array(x)         
        temp = np.ones(x.shape[0]+1)         
        temp[0:-1] = x         
        a = temp         
        for l in range(0, len(self.weights)):             
            a = self.activation(np.dot(a, self.weights[l]))         
        return

 

简单非线性关系数据集测试(XOR):

from NeuralNetwork import NeuralNetwork
import numpy as np

nn = NeuralNetwork([2,2,1], 'tanh')     
X = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])     
y = np.array([0, 1, 1, 0])     
nn.fit(X, y)     
for i in [[0, 0], [0, 1], [1, 0], [1,1]]:    
    print(i, nn.predict(i))

 

手写数字识别

每个图片8x8 识别数字:0,1,2,3,4,5,6,7,8,9

import numpy as np 
from sklearn.datasets import load_digits 
from sklearn.metrics import confusion_matrix, classification_report 
from sklearn.preprocessing import LabelBinarizer 
from NeuralNetwork import NeuralNetwork
from sklearn.cross_validation import train_test_split


digits = load_digits()  
X = digits.data  
y = digits.target  
X -= X.min() # normalize the values to bring them into the range 0-1  
X /= X.max()

nn = NeuralNetwork([64,100,10],'logistic')  
X_train, X_test, y_train, y_test = train_test_split(X, y)  
labels_train = LabelBinarizer().fit_transform(y_train)  
labels_test = LabelBinarizer().fit_transform(y_test)
print "start fitting"
nn.fit(X_train,labels_train,epochs=3000)  
predictions = []  
for i in range(X_test.shape[0]):  
    o = nn.predict(X_test[i] )  
    predictions.append(np.argmax(o))  
print confusion_matrix(y_test,predictions)  
print classification_report(y_test,predictions)
2、 深度学习算法及代码实现

一、感知器-从零开始学深度学习


未来将是人工智能和大数据的时代,是各行各业使用人工智能在云上处理大数据的时代,深度学习将是新时代的一大利器,在此我将从零开始记录深度学习的学习历程。

我希望在学习过程中做到以下几点:

  • 了解各种神经网络设计原理。

  • 掌握各种深度学习算法的Python编程实现。

  • 运用深度学习解决实际问题。

让我们开始踏上深度度学习的征程。


一、感知器原型

想要了解“神经网络”,我们需要了解一种叫做“感知器”的⼈⼯神经元。感知器在 20 世纪五、六⼗年代由科学家 Frank Rosenblatt 发明,⼀个感知器接受⼏个输⼊,并产⽣⼀个输出

下图是一个感知器:

img

⽰例中的感知器有三个输⼊x1、x2、x3(1w0作为偏置,后面会讲到)。通常可以有更多或更少输⼊。 Rosenblatt 提议⼀个简单的规则来计算输出。他引⼊权重w1、w2、w3..表⽰相应输⼊对于输出重要性的实数(权重)。神经元的输出为0 或者 1,则由计算权重后的总和 ∑jwjxj∑jwjxj ⼩于或者⼤于⼀些阈值决定。和权重⼀样, 阈值是⼀个实数,⼀个神经元的参数。⽤更精确的代数形式: 这就是⼀个感知器要做的所有事情! 而我们把阖值移动到不等式左边,并用感知器的偏置b=-threshold代替,用偏置而不用阖值。其中实现偏置的一种方法就是如前图所示在输入中引入一个偏置神经元x0=1,则b=x0w0,那么感知器的规则可以重写为:

此时就可以使用阶跃函数来作为感知器的激励函数

到此我们可以发现,一个感知器由以下几部分组成

模型的建立是运用深度学习方法解决问题的基础。

二、感知器的运用
1、感知器实现逻辑运算

我们设计一个感知器,让它来实现and运算。程序员都知道,and是一个二元函数(带有两个参数和),下面是它的真值表:

x1x1x2x2yy
0 0 0
0 1 0
1 0 0
1 1 1

为了计算方便,我们用0表示false,用1表示true。

img

 

可以看到感知器本身是一个线性分类,它通过求考虑了权重的各输入之和与阖值的大小关系,对事物进行分类

 

所以任何线性分类或线性回归问题都可以用感知器解决。前面的布尔运算可以看作是二分类问题,即给定一个输入,输出0(属于分类0)或1(属于分类1)。 如下面所示,and运算是一个线性分类问题,即可以用一条直线把分类0(false,红叉表示)和分类1(true,绿点表示)分开。

img

然而,感知器却不能实现异或运算,如下图所示,异或运算不是线性的,你无法用一条直线把分类0和分类1分开。

img

2、感知器的训练

img

三、python实现感知器

class Perceptron(object):
    def __init__(self, input_num, activator):
        '''
        初始化感知器,设置输入参数的个数,以及激活函数。
        激活函数的类型为double -> double
        '''
        self.activator = activator
        # 权重向量初始化为0
        self.weights = [0.0 for _ in range(input_num)]
        # 偏置项初始化为0
        self.bias = 0.0
    def __str__(self):
        '''
        打印学习到的权重、偏置项
        '''
        return 'weights\t:%s\nbias\t:%f\n' % (self.weights, self.bias)
    def predict(self, input_vec):
        '''
        输入向量,输出感知器的计算结果
        '''
        # 把input_vec[x1,x2,x3...]和weights[w1,w2,w3,...]打包在一起
        # 变成[(x1,w1),(x2,w2),(x3,w3),...]
        # 然后利用map函数计算[x1*w1, x2*w2, x3*w3]
        # 最后利用reduce求和
        return self.activator(
            reduce(lambda a, b: a + b,
                   map(lambda (x, w): x * w,  
                       zip(input_vec, self.weights))
                , 0.0) + self.bias)
    def train(self, input_vecs, labels, iteration, rate):
        '''
        输入训练数据:一组向量、与每个向量对应的label;以及训练轮数、学习率
        '''
        for i in range(iteration):
            self._one_iteration(input_vecs, labels, rate)
    def _one_iteration(self, input_vecs, labels, rate):
        '''
        一次迭代,把所有的训练数据过一遍
        '''
        # 把输入和输出打包在一起,成为样本的列表[(input_vec, label), ...]
        # 而每个训练样本是(input_vec, label)
        samples = zip(input_vecs, labels)
        # 对每个样本,按照感知器规则更新权重
        for (input_vec, label) in samples:
            # 计算感知器在当前权重下的输出
            output = self.predict(input_vec)
            # 更新权重
            self._update_weights(input_vec, output, label, rate)
    def _update_weights(self, input_vec, output, label, rate):
        '''
        按照感知器规则更新权重
        '''
        # 把input_vec[x1,x2,x3,...]和weights[w1,w2,w3,...]打包在一起
        # 变成[(x1,w1),(x2,w2),(x3,w3),...]
        # 然后利用感知器规则更新权重
        delta = label - output
        self.weights = map(
            lambda (x, w): w + rate * delta * x,
            zip(input_vec, self.weights))
        # 更新bias
        self.bias += rate * delta
def f(x):
    '''
    定义激活函数f
    '''
    return 1 if x > 0 else 0
def get_training_dataset():
    '''
    基于and真值表构建训练数据
    '''
    # 构建训练数据
    # 输入向量列表
    input_vecs = [[1,1], [0,0], [1,0], [0,1]]
    # 期望的输出列表,注意要与输入一一对应
    # [1,1] -> 1, [0,0] -> 0, [1,0] -> 0, [0,1] -> 0
    labels = [1, 0, 0, 0]
    return input_vecs, labels    
def train_and_perceptron():
    '''
    使用and真值表训练感知器
    '''
    # 创建感知器,输入参数个数为2(因为and是二元函数),激活函数为f
    p = Perceptron(2, f)
    # 训练,迭代10轮, 学习速率为0.1
    input_vecs, labels = get_training_dataset()
    p.train(input_vecs, labels, 10, 0.1)
    #返回训练好的感知器
    return p
if __name__ == '__main__': 
    # 训练and感知器
    and_perception = train_and_perceptron()
    # 打印训练获得的权重
    print and_perception
    # 测试
    print '1 and 1 = %d' % and_perception.predict([1, 1])
    print '0 and 0 = %d' % and_perception.predict([0, 0])
    print '1 and 0 = %d' % and_perception.predict([1, 0])
    print '0 and 1 = %d' % and_perception.predict([0, 1])

第三章、机器学习算法应用实战

第四章、spark机器学习算法

一、Spark MLlib介绍

img

一、什么是机器学习

机器学习(Machine Learning, ML)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。它是人工智能的核心,是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域,它主要使用归纳、综合而不是演绎。一种经常引用的英文定义是:

A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E。

img

机器学习强调三个关键词:算法、经验、性能,其处理过程如上图所示。在数据的基础上,通过算法构建出模型并对模型进行评估。评估的性能如果达到要求,就用该模型来测试其他的数据;如果达不到要求,就要调整算法来重新建立模型,再次进行评估。如此循环往复,最终获得满意的经验来处理其他的数据。机器学习技术和方法已经被成功应用到多个领域,比如个性推荐系统,金融反欺诈,语音识别,自然语言处理和机器翻译,模式识别,智能控制等。

二、基于大数据的机器学习

传统的机器学习算法,由于技术和单机存储的限制,只能在少量数据上使用。即以前的统计/机器学习依赖于数据抽样。但实际过程中样本往往很难做好随机,导致学习的模型不是很准确,在测试数据上的效果也可能不太好。

随着 HDFS(Hadoop distributed File System) 等分布式文件系统出现,存储海量数据已经成为可能。在全量数据上进行机器学习也成为了可能,这顺便也解决统计随机性的问题。然而,由于 MapReduce 自身的限制,使得使用 MapReduce 来实现分布式机器学习算法非常耗时和消耗磁盘IO。因为通常情况下机器学习算法参数学习的过程都是迭代计算的,即本次计算的结果要作为下一次迭代的输入,这个过程中,如果使用 MapReduce,我们只能把中间结果存储磁盘,然后在下一次计算的时候从新读取,这对于迭代 频发的算法显然是致命的性能瓶颈。

在大数据上进行机器学习,需要处理全量数据并进行大量的迭代计算,这要求机器学习平台具备强大的处理能力。Spark 立足于内存计算,天然的适应于迭代式计算。即便如此,对于普通开发者来说,实现一个分布式机器学习算法仍然是一件极具挑战的事情。

幸运的是,Spark提供了一个基于海量数据的机器学习库,它提供了常用机器学习算法的分布式实现,开发者只需要有 Spark 基础并且了解机器学习算法的原理,以及方法相关参数的含义,就可以轻松的通过调用相应的 API 来实现基于海量数据的机器学习过程。

其次,Spark-Shell的即席查询也是一个关键。算法工程师可以边写代码边运行,边看结果。spark提供的各种高效的工具正使得机器学习过程更加直观便捷。比如通过sample函数,可以非常方便的进行抽样。

当然,Spark发展到后面,拥有了实时批计算,批处理,算法库,sql、流计算等模块等,基本可以看做是全平台的系统。把机器学习作为一个模块加入到Spark中,也是大势所趋。

三、Spark 机器学习库MLLib

MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容

机器学习算法:常用的学习算法,如分类、回归、聚类和协同过滤;

特征处理:特征提取、转化、降维,和特征选择;

管道(Pipeline):用于构建、评估和调整机器学习管道的工具;

存储:保存和加载算法,模型和管道;

实用工具:线性代数,统计,数据处理等工具。

Spark 机器学习库从 1.2 版本以后被分为两个包:

spark.mllib包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算法实现都是基于原始的 RDD。

spark.ml 则提供了基于DataFrames 高层次的API,可以用来构建机器学习工作流(PipeLine)。ML Pipeline 弥补了原始 MLlib 库的不足,向用户提供了一个基于 DataFrame 的机器学习工作流式 API 套件。

使用 ML Pipeline API可以很方便的把数据处理,特征转换,正则化,以及多个机器学习算法联合起来,构建一个单一完整的机器学习流水线。这种方式给我们提供了更灵活的方法,更符合机器学习过程的特点,也更容易从其他语言迁移。Spark官方推荐使用spark.ml。如果新的算法能够适用于机器学习管道的概念,就应该将其放到spark.ml包中,如:特征提取器和转换器。开发者需要注意的是,从Spark2.0开始,基于RDD的API进入维护模式(即不增加任何新的特性),并预期于3.0版本的时候被移除出MLLib。

Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:

 

img

二、spark mlLib基本数据类型

img

MLlib支持存储在单个机器上的局部向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。 局部向量和局部矩阵是用作公共接口的简单数据模型。 底层线性代数操作由Breeze提供。 在监督学习中使用的训练示例在MLlib中称为“标记点”。


一、本地向量

局部向量具有整数类型和基于0的索引和双类型值,存储在单个机器上。 MLlib支持两种类型的局部向量:密集和稀疏。 密集向量由表示其条目值的双数组支持,而稀疏向量由两个并行数组支持:索引和值。 例如,矢量(1.0,0.0,3.0)可以以密集格式表示为[1.0,0.0,3.0],或者以稀疏格式表示为(3,[0,2],[1.0,3.0]),其中3是 矢量的大小。

> //创建一个稠密本地向量
>
> Vector v1 = Vectors.dense(0.0, 10.0, 0.5);

> //创建一个稀疏向量
>
> Vector v2 = Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3});
二、向量标签标记点)(LabeledPoint)

向量标签LabeledPoint是一种带有标签(Label/Response)的本地向量,它可以是稠密或者是稀疏的。 在MLlib中,标记点用于监督学习算法。由于标签是用双精度浮点型来存储的,故标注点类型在回归(Regression)和分类(Classification)问题上均可使用。例如,对于二分类问题,则正样本的标签为1,负样本的标签为0,而对于多类别的分类问题来说,标签则应是一个以0开始的索引序列:0, 1, 2 ...

> //创建一个标签为1.0(分类中可视为正样本)的稠密向量标注点
>
> LabeledPoint L1 = new LabeledPoint(1.0,Vectors.dense(2.0, 3.0, 3.0));

> //创建一个标签为0.0(分类中可视为负样本)的稀疏向量标注点
>
> LabeledPoint L2 = new LabeledPoint(0.0,Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3}));

在实际的机器学习问题中,稀疏向量数据是非常常见的,MLlib提供了读取LIBSVM格式数据的支持,该格式被广泛用于LIBSVM、LIBLINEAR等机器学习库。在该格式下,每一个带标注的样本点由以下格式表示:

label index1:value1 index2:value2 index3:value3 ...

其中label是该样本点的标签值,一系列index:value对则代表了该样本向量中所有非零元素的索引和元素值。这里需要特别注意的是,index是以1开始并递增的。 MLlib在org.apache.spark.mllib.util.MLUtils工具类中提供了读取LIBSVM格式的方法loadLibSVMFile,其使用非常方便。

> SparkSession spark = SparkSession.builder().appName("VectorsTest").master("local[2]").getorCreate();
>
> SparkContext sparkContext = spark.sparkContext();
>
> JavaRDD<LabeledPoint> examples=MLUtils.loadLibSVMFile(sparkContext,"data/mllib/sample_libsvm_data.txt").toJavaRDD();

> (0.0,(692,[127,128,129,130,131,154,155,156,157,158,159,181,182,183,184,185,186,187,188,189,207,208,209,210,211,212,213,214,215,216,217,235,236,237,238,239,240,241,242,243,244,245,262,263,264,265,266,267,268,269,270,271,272,273,289,290,291,292,293,294,295,296,297,300,301,302,316,317,318,319,320,321,328,329,330,343,344,345,346,347,348,349,356,357,358,371,372,373,374,384,385,386,399,400,401,412,413,414,426,427,428,429,440,441,442,454,455,456,457,466,467,468,469,470,482,483,484,493,494,495,496,497,510,511,512,520,521,522,523,538,539,540,547,548,549,550,566,567,568,569,570,571,572,573,574,575,576,577,578,594,595,596,597,598,599,600,601,602,603,604,622,623,624,625,626,627,628,629,630,651,652,653,654,655,656,657],[51.0,159.0,253.0,159.0,50...

每个标注点共有692个维,其中第127列对应的值是51.0,第128列对应的值是159.0,依此类推。

三、本地矩阵

局部矩阵具有整数类型的行和列索引以及双类型值,存储在单个机器上。 MLlib支持密集矩阵,其条目值以列主要顺序存储在单个双数组中,以及稀疏矩阵,其非零条目值以列主要顺序存储在压缩稀疏列(CSC)格式中。 例如,以下密集矩阵:

 

img

存储在具有矩阵大小(3,2)的一维阵列[1.0,3.0,5.0,2.0,4.0,6.0]中。

> //创建稠密矩阵((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
>
> Matrix d = Matrices.dense(3, 2, new double[]{1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

> //创建稀疏矩阵((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
>
> Matrix s = Matrices.sparse(3, 2, new int[]{0, 1,3}, new int[]{0, 1,1}, new double[]{9.0, 6.0,8.0});

这里,创建一个3行2列的稀疏矩阵[ [9.0,0.0], [0.0,8.0], [0.0,6.0]]。Matrices.sparse的参数中,3表示行数,2表示列数。第1个数组参数表示列指针,即每一列元素的开始索引值, 第二个数组参数表示行索引,即对应的元素是属于哪一行;第三个数组即是按列先序排列的所有非零元素,通过列指针和行索引即可判断每个元素所在的位置。比如取每个数组的第2个元素为2,1,6,表示第2列第1行的元素值是6.0。

四、分布式矩阵

分布式矩阵具有长类型的行和列索引以及双类型值,分布式地存储在一个或多个RDD中。选择正确的格式来存储大型和分布式矩阵非常重要。将分布式矩阵转换为不同的格式可能需要全局混洗,这非常昂贵。到目前为止已经实现了四种类型的分布式矩阵。

基本类型称为RowMatrix。 RowMatrix是行方向的分布式矩阵,没有有意义的行索引,例如特征向量的集合。它由行的RDD支持,其中每行是本地向量。我们假设RowMatrix的列数不是很大,因此单个本地向量可以合理地传递给驱动程序,也可以使用单个节点进行存储/操作。 IndexedRowMatrix类似于RowMatrix,但具有行索引,可用于标识行和执行连接。 CoordinateMatrix是以坐标列表(COO)格式存储的分布式矩阵,由其条目的RDD支持。 BlockMatrix是由MatrixBlock的RDD支持的分布式矩阵,它是(Int,Int,Matrix)的元组

4.1、行矩阵(RowMatrix)

RowMatrix是面向行的分布式矩阵,没有有意义的行索引,由其行的RDD支持,其中每行是本地向量。 由于每一行都由局部向量表示,因此列数受整数范围的限制,但在实践中它应该小得多。

 SparkConf conf = new SparkConf().setMaster("local").setAppName("distributedMatrixRowMatrix"); 

JavaSparkContext jsc = new JavaSparkContext(conf); 

JavaRDD<Vector> rows = jsc.parallelize(Arrays.asList(Vectors.dense(4.0,5.0,6.0),Vectors.dense(2.0,12.0,6.0)));

 RowMatrix matrix = new RowMatrix(rows.rdd()); 

System.out.println(matrix.numCols()); 

System.out.println(matrix.numRows());

 System.out.println(matrix.rows().first());

 System.out.println("行数:"+matrix.computeColumnSummaryStatistics().count()); 

System.out.println("最大向量:"+matrix.computeColumnSummaryStatistics().max());

 System.out.println("方差向量:"+matrix.computeColumnSummaryStatistics().variance()); 

System.out.println("L1范数向量:"+matrix.computeColumnSummaryStatistics().norml1());

在获得RowMatrix的实例后,我们可以通过其自带的computeColumnSummaryStatistics()方法获取该矩阵的一些统计摘要信息,并可以对其进行QR分解,SVD分解和PCA分解,这一部分内容将在特征降维的章节详细解说,这里不再叙述。

4.2、索引行矩阵(IndexedRowMatrix)

IndexedRowMatrix类似于RowMatrix但具有有意义的行索引。 它由索引行的RDD支持,因此每行由其索引(long-typed)和本地向量表示。

JavaRDD<IndexedRow> rows2 = jsc.parallelize(Arrays.asList(new IndexedRow(1, Vectors.dense(1.0, 2.3, 2.6)), new IndexedRow (2, Vectors.dense(1.0,2.3,50.6))));

IndexedRowMatrix mat2 = new IndexedRowMatrix(rows2.rdd());

 

4.3、坐标矩阵(Coordinate Matrix)

CoordinateMatrix是由其条目的RDD支持的分布式矩阵。 每个条目都是(i:Long,j:Long,value:Double)的元组,其中i是行索引,j是列索引,value是条目值。 只有当矩阵的两个维度都很大且矩阵非常稀疏时,才应使用CoordinateMatrix。

JavaRDD<MatrixEntry> rows3 = jsc.parallelize(Arrays.asList(new MatrixEntry(0,0,1.0), new MatrixEntry (1,0,2.0)));

CoordinateMatrix mat3 = new CoordinateMatrix(rows3.rdd());

 

4.4、分块矩阵(Block Matrix)

BlockMatrix是由MatrixBlocks的RDD支持的分布式矩阵,其中MatrixBlock是((Int,Int),Matrix)的元组,其中(Int,Int)是块的索引,而Matrix是子 - 给定索引处的矩阵,其大小为rowsPerBlock x colsPerBlock。 BlockMatrix支持添加和乘以另一个BlockMatrix等方法。 BlockMatrix还有一个辅助函数validate,可用于检查BlockMatrix是否设置正确。

img

JavaRDD<MatrixEntry> rows4 = jsc.parallelize(Arrays.asList(new MatrixEntry(0,0,1.0), new MatrixEntry (1,0,2.0)));

CoordinateMatrix mat = new CoordinateMatrix(rows4.rdd());

BlockMatrix matA = mat.toBlockMatrix().cache();

 

三、spark MlLib基本统计工具

img

给定一个数据集,数据分析师一般会先观察一下数据集的基本情况,称之为汇总统计或者概要性统计。一般的概要性统计用于概括一系列观测值,包括位置或集中趋势(比如算术平均值、中位数、众数和四分位均值),展型(比如四分位间距、绝对偏差和绝对距离偏差、各阶矩等),统计离差,分布的形状,依赖性等。除此之外,spark.mllib库也提供了一些其他的基本的统计分析工具,包括相关性、分层抽样、假设检验,随机生成等。

一、概括统计 summary statistics

我们通过统计学中提供的函数colStats为RDD [Vector]提供列摘要统计信息。我们可以获得每一列的最大值,最小值,均值、方差、总数。

我们用UCI 提供的莺尾花的数据来举例。 数据下载地址:http://archive.ics.uci.edu/ml/machine-learning-databases/iris/。我们将鸢尾花的四个属性,即萼片长度,萼片宽度,花瓣长度和花瓣宽度存储在observations中,类型为RDD[Vector]。

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.mllib.linalg.Vector;

import org.apache.spark.mllib.linalg.Vectors;

import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;

import org.apache.spark.mllib.stat.Statistics; 



SparkConf conf = new SparkConf().setAppName("colStatsTest").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<String> source = sc.textFile("data/iris.data"); // 读取数据

JavaRDD<Vector> observations = source.map(line -> {   

​    String[] parts = line.split(",");   

​    return Vectors.dense(Double.valueOf(parts[0]), Double.valueOf(parts[1]),        

​    Double.valueOf(parts[2]), Double.valueOf(parts[3]));//将RDD<String>转化为RDD<Vector>

});

MultivariateStatisticalSummary summary = Statistics.colStats(observations.rdd());  //计算列摘要统计信息。

System.out.println(summary.count()); //总数(long)

System.out.println(summary.mean()); // 包含每列平均值的密集向量(vector)

System.out.println(summary.variance());  // 列方差(vector)

System.out.println(summary.max());  // 最大值(vector)

System.out.println(summary.min());  // 最小值(vector)

System.out.println(summary.norml1());  //每列的L1范数(vector)

System.out.println(summary.norml2());  //每列的L2范数(vector)

System.out.println(summary.numNonzeros()); // 每列中的非零数(vector) 

 

 


二、相关性

计算两个数据系列之间的相关性是统计学中的常见操作。 在spark.mllib中,我们提供了计算多个系列之间成对相关性的灵活性。 支持的相关方法目前是Pearson和Spearman的相关性。

相关系数是用以反映变量之间相关关系密切程度的统计指标。简单的来说就是相关系数绝对值越大(值越接近1或者-1),当取值为0表示不相关,取值为(0~-1]表示负相关,取值为(0, 1]表示正相关。

2.1、Pearson相关系数

Pearson相关系数表达的是两个数值变量的线性相关性, 它一般适用于正态分布。其取值范围是[-1, 1], 当取值为0表示不相关,取值为[-1~0)表示负相关,取值为(0, 1]表示正相关。

 

img

Pearson相关系数

2.2、Spearman相关系数

Spearman相关系数也用来表达两个变量的相关性,但是它没有Pearson相关系数对变量的分布要求那么严格,另外Spearman相关系数可以更好地用于测度变量的排序关系。其计算公式为:

 

img

2.3 Spearman相关系数

统计提供了计算序列之间相关性的方法。 根据输入类型,两个JavaDoubleRDD或JavaRDD <Vector>,输出将分别为Double或相关矩阵。

JavaDoubleRDD seriesX = source.mapTodouble(x->Double.parseDouble(x.split(",")[0]));

JavaDoubleRDD seriesY = source.mapTodouble(x->Double.parseDouble(x.split(",")[1])); //必须具有与seriesX相同数量的分区和基数

Double correlation = Statistics.corr(seriesX.srdd(), seriesY.srdd(), "pearson");//使用Pearson方法计算相关性。 为Spearman的方法输入“spearman”。如果未指定方法认情况下将使用Pearson的方法。

System.out.println("Correlation is: " + correlation);

/**控制台输出结果:

\---------------------------------------------------- 

Correlation is: -0.10936924995062468

\----------------------------------------------------- 

**/ 

说明数据集的前两列,即花萼长度和花萼宽度具有微小的负相关性。

JavaRDD<Vector> data = source.map(line -> {   

  String[] parts = line.split(",");   

  return Vectors.dense(Double.valueOf(parts[0]),       

  Double.valueOf(parts[1]));//将RDD<String>转化为RDD<Vector>

}); //请注意,每个Vector都是一行而不是一列

Matrix correlMatrix = Statistics.corr(data.rdd(), "pearson");

System.out.println(correlMatrix.toString());

/**控制台输出结果:

\-----------------------------------------------------

**1.0          -0.10936924995062468

**-0.10936924995062468 1.0

\-------------------------------------------------------

**/ 

 

 


三、分层抽样 Stratified sampling

与spark.mllib中的其他统计函数不同,可以对RDD的键值对执行分层抽样方法sampleByKey和sampleByKeyExact。 对于分层抽样,可以将键视为标签,将值视为特定属性。 例如,密钥可以是人或女人,或文档ID,并且相应的值可以是人口中的人的年龄列表或文档中的单词列表。 sampleByKey方法将翻转硬币以决定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。 sampleByKeyExact比sampleByKey中使用的每层简单随机抽样需要更多的资源,但是会提供99.99%置信度的精确抽样大小。

3.1、sampleByKey 方法

sampleByKeyExact()允许用户准确地采样⌈fk⋅nk⌉∀k∈K项,其中fk是密钥k的期望分数,nk是密钥k的键 - 值对的数量,K是密钥集。 无需更换的采样需要在RDD上额外通过一次以确保样本大小,而更换采样则需要两次额外的通过。

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaPairRDD;

import org.spark_project.guava.collect.ImmutableMap;

import scala.Tuple2;

List<Tuple2<String, String>> list = Arrays.asList(        

new Tuple2<>("female","Lily"),        

new Tuple2<>("female","Lucy"),        

new Tuple2<>("female","Emily"),        

 new Tuple2<>("female","Kate"),        

 new Tuple2<>("female","Alice"),        

new Tuple2<>("male","Tom"),        

 new Tuple2<>("male","Roy"),        

new Tuple2<>("male","David"),        

new Tuple2<>("male","Frank"),        

new Tuple2<>("male","Jack"));//创建了一组数据,分成 “female” 和 “male” 两类 

JavaPairRDD<String, String> data = sc.parallelizePairs(list);

ImmutableMap<String, Double> fractions = new ImmutableMap.Builder<String,Double>()        

 .put("female",0.6)        

.put("male",0.4)        

.build(); //从每个键Map <K,Double>中指定所需的精确分数

这里,设置采取60%的female和40%的male,因为数据中female和male各有5个样本,所以理想中的抽样结果应该是有3个female和2个male。接下来用sampleByKey进行抽样:

JavaPairRDD<String, String> approxSample = data.sampleByKey(false, fractions,1); //从每个层获取大致样本

approxSample.foreach(x->{  

System.out.println(x);  //打印approxSample分层取样的结果;

});

/** *控制台输出结果:

\-----------------------

 (female,Lily) 

(female,Lucy)

 (female,Emily) 

(female,Kate) 

(male,Roy) 

(male,Frank)

\--------------------

**/ 

从上面可以看到,本应该抽取3个female和2个male,但结果抽取了5个female和1个male,结果并不够准确,不过在样本数量足够大且要求一定效率的情况下,用sampleByKey进行抽样还是可行的。

3.2、sampleByKeyExact 方法

sampleByKey 和 sampleByKeyExact 的区别在于 sampleByKey 每次都通过给定的概率以一种类似于掷硬币的方式来决定这个观察值是否被放入样本,因此一遍就可以过滤完所有数据,最后得到一个近似大小的样本,但往往不够准确。而 sampleByKeyExtra 会对全量数据做采样计算。对于每个类别,其都会产生 (fk⋅nk)个样本,其中fk是键为k的样本类别采样的比例;nk是键k所拥有的样本数。 sampleByKeyExtra 采样的结果会更准确,有99.99%的置信度,但耗费的计算资源也更多。

JavaPairRDD<String, String> exactSample = data.sampleByKeyExact(false, fractions,1);

exactSample.foreach(x->{   

System.out.println(x);//打印exactSample分层取样的结果;});

/***控制台输出结果:

\--------------------

(female,Lily)

(female,Emily)

(female,Kate)

(male,Roy)

(male,Frank)

\--------------------

**/

从实验结果可以看出,所得结果和预想一致,但当样本数量比较大时,可能会耗时较久。

 


四、假设检验 hypothesis testing

假设检验是统计学中一种强有力的工具,用于确定结果是否具有统计显着性,无论该结果是否偶然发生。 spark.mllib目前支持Pearson的卡方(χ2)测试,以确保拟合和独立性。 不同的输入类型决定了是做拟合度检验还是独立性检验。拟合度检验要求输入为Vector, 独立性检验要求输入是Matrix。

> import org.apache.spark.SparkConf;
>
> import org.apache.spark.api.java.JavaRDD;
>
> import org.apache.spark.api.java.JavaSparkContext;
>
> import org.apache.spark.mllib.linalg.Matrices;
>
> import org.apache.spark.mllib.linalg.Matrix;
>
> import org.apache.spark.mllib.linalg.Vector;
>
> import org.apache.spark.mllib.linalg.Vectors;
>
> import org.apache.spark.mllib.stat.Statistics;
>
> import org.apache.spark.mllib.stat.test.ChiSqTestResult;

> JavaRDD<Vector> vectors1 = source.map(x->{   
>
> String[] splits = x.split(",");  
>
> return Vectors.dense(Double.parseDouble(splits[0]),       
>
>  Double.parseDouble(splits[1]),        
>
> Double.parseDouble(splits[2]),        
>
> Double.parseDouble(splits[3]));
>
> });
>
> Vector v1 = vectors1.first();//获取 iris数据集中的第1条记录 ,类型为Vector
>
> Vector v2 = vectors1.take(2).get(1);//获取 iris数据集中的第2条记录 ,类型为Vector 

 

4.1、适合度检验 Goodness fo fit
> ChiSqTestResult goodnessOfFitTestResult = Statistics.chiSqTest(v1);
>
> System.out.println("goodnessOfFitTestResult:"+goodnessOfFitTestResult);

> /***控制台输出结果:
>
> \------------------------------------------------------------------------------------------------------------------------
>
> goodnessOfFitTestResult:Chi squared test summary:
>
> method: pearson
>
> degrees of freedom = 3
>
> statistic = 5.588235294117647
>
> pValue = 0.1334553914430291
>
> No presumption against null hypothesis: observed follows the same distribution as expected..
>
> \-----------------------------------------------------------------------------------------------------------------------------
>
> **/

可以看到P值,自由度,检验统计量,所使用的方法,以及零假设等信息。我们先简单介绍下每个输出的意义:

method: 方法。这里采用pearson方法

statistic: 检验统计量。简单来说就是用来决定是否可以拒绝原假设的证据。检验统计量的值是利用样本数据计算得到的,它代表了样本中的信息。检验统计量的绝对值越大,拒绝原假设的理由越充分,反之,不拒绝原假设的理由越充分。

degrees of freedom:自由度。表示可自由变动的样本观测值的数目,

pValue:统计学根据显著性检验方法所得到的P 值。一般以P < 0.05 为显著, P<0.01 为非常显著,其含义是样本间的差异由抽样误差所致的概率小于0.05 或0.01。

一般来说,假设检验主要看P值就够了。在本例中pValue =0.133,说明两组的差别无显著意义。通过V1的观测值[5.1, 3.5, 1.4, 0.2],无法拒绝其服从于期望分配(这里认是均匀分配)的假设。

4.2、独立性检验 Indenpendence

卡方独立性检验是用来检验两个属性间是否独立。其中一个属性做为行,另外一个做为列,通过貌似相关的关系考察其是否真实存在相关性。比如天气温变化和肺炎发病率。

> Matrix mat = Matrices.dense(2, 2, new double[]{v1.apply(0), v1.apply(1),v2.apply(0), v2.apply(1)});
>
> System.out.println("mat:"+mat);

> /***
>
> 控制台输出结果:
>
> \---------------------------
>
> mat:5.1 4.9 3.5 3.0 
>
> \----------------------------
>
> **/

同样的,键值对也可以进行独立性检验,这里我们取iris的数据组成键值对:

> JavaRDD<LabeledPoint> LabeledPoints = source.map(x->{   
>
> String[] splits = x.split(",");   
>
> Double label = 0.0;   
>
> if(splits[4].equals("Iris-setosa")) {      
>
> ​     label = 0.0;   
>
> }else if(splits[4].equals("Iris-versicolor")) { 
>
> ​     label = 1.0;   
>
> }else {      
>
> ​     label = 2.0;   
>
> }   
>
> return new LabeledPoint(label,Vectors.dense(Double.parseDouble(splits[0]),         
>
>    Double.parseDouble(splits[1]),          
>
>    Double.parseDouble(splits[2]),        
>
>    Double.parseDouble(splits[3])));
>
> });

> ChiSqTestResult[] c2 = Statistics.chiSqTest(LabeledPoints);
>
> for (ChiSqTestResult chiSqTestResult : c2) {  
>
>  System.out.println("c2:"+chiSqTestResult);}
>
> /***控制台输出结果:
>
> \-----------------------------------------------------------------------------------------------------------------------------------------------------
>
> c2:Chi squared test summary:
>
> method: pearson
>
> degrees of freedom = 68
>
> statistic = 156.26666666666665
>
> pValue = 6.6659873176888595E-9
>
> Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
>
> c2:Chi squared test summary:
>
> method: pearson
>
> degrees of freedom = 44
>
> statistic = 88.36446886446883
>
> pValue = 8.303947787857702E-5
>
> Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
>
> c2:Chi squared test summary:
>
> method: pearson
>
> degrees of freedom = 84
>
> statistic = 271.79999999999995
>
> pValue = 0.0
>
> Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
>
> c2:Chi squared test summary:
>
> method: pearson
>
> degrees of freedom = 42statistic = 271.75
>
> pValue = 0.0
>
> Very strong presumption against null hypothesis: the occurrence of the outcomes is statistically independent..
>
> \---------------------------------------------------------------------------------------------------------------------------------------------------
>
> **/

这里实际上是把特征数据中的每一列都与标签进行独立性检验。可以看出,P值都非常小,说明可以拒绝“某列与标签列无关”的假设。也就是说,可以认为每一列的数据都与最后的标签有相关性。

 


五、随机生成 random data generation

RandomrDDs 是一个工具集,用来生成含有随机数的RDD,可以按各种给定的分布模式生成数据集,Random RDDs包下现支持正态分布、泊松分布和均匀分布三种分布方式。RandomrDDs提供随机double RDDS或vector RDDS。

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaDoubleRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.mllib.random.RandomrDDs;

JavaDoubleRDD u = RandomrDDs.normalJavaRDD(sc, 1000000L, 10); //生成1000000个服从正态分配N(0,1)的RDD[Double],并且分布在 10 个分区中:

JavaDoubleRDD v = u.mapTodouble(x->1.0+2.0*x);  //把生成随机数转化成N(1,4) 正态分布: 

 


六、核密度估计 Kernel density estimation

Spark ML 提供了一个工具类 KernelDensity 用于核密度估算,核密度估算的意思是根据已知的样本估计未知的密度,属於非参数检验方法之一。核密度估计的原理是。观察某一事物的已知分布,如果某一个数在观察中出现了,可认为这个数的概率密度很大,和这个数比较近的数的概率密度也会比较大,而那些离这个数远的数的概率密度会比较小。Spark1.6.2版本支持高斯核(Gaussian kernel)。

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.mllib.stat.KernelDensity;

JavaRDD<Double> d = source.map(t -> Double.parseDouble(t.split(",")[0]));  //用样本数据构建核函数,这里用假设检验中得到的iris的第一个属性的数据作为样本数据进行估计:

KernelDensity kd = new KernelDensity().setBandwidth(3.0).setSample(d); //其中setBandwidth表示高斯核的宽度,为一个平滑参数,可以看做是高斯核的标准差。

double[] densities = kd.estimate(new double[] {-1.0, 2.0, 5.0, 5.8});

System.out.println(Arrays.toString(densities));

/***

控制台输出结果:

\------------------------------------------------------------------------------------------------------------------------------------

[0.011372003554433527, 0.05992591135719891, 0.12365409462424529, 0.12816280708978112]

\-------------------------------------------------------------------------------------------------------------------------------------

**/ 

四、spark MlLib之逻辑回归

img

一、概念

逻辑斯蒂回归(logistic regression)是统计学习中的经典分类方法,属于对数线性模型。logistic回归的因变量可以是二分类的,也可以是多分类的。logistic回归的因变量可以是二分非线性差分方程类的,也可以是多分类的,但是二分类的更为常用,也更加容易解释。所以实际中最为常用的就是二分类的logistic回归。

二、logistic分布

设X是连续随机变量,X服从逻辑斯蒂分布是指X具有下列分布函数和密度函数

 

img

分布函数和密度函数

式中,μ为位置参数,γ>0为形状参数。

密度函数是脉冲函数

分布函数是一条Sigmoid曲线(sigmoid curve)即为阶跃函数

 

img

Sigmoid曲线

三、二项逻辑斯谛回归模型

二项逻辑斯谛回归模型是如下的条件概率分布

img

回归模型

x∊Rn是输入,Y∊{0,1}是输出,w∊Rn和b∊R是参数,

w称为权值向量,b称为偏置,w·x为w和x的内积。

可以求得P(Y=1|x)和P(Y=0|x)。

逻辑斯谛回归比较两个条件概率值的大小,将实例x分到概率值较大的那一类。

四、LR模型参数估计

可以应用极大似然估计法估计模型参数

 

img

极大似然估计

对L(w)求极大值,得到w的估计值。

问题就变成了以对数似然函数为目标函数的最优化问题。

LR学习中通常采用的方法是梯度下降法及拟牛顿法。

五、代码实现

我们以iris数据集(https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data)为例进行分析。iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。

 

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
5.1、读取数据

首先,读取文本文件;然后,通过map将每行的数据用“,”隔开,在我们的数据集中,每行被分成了5部分,前4部分是鸢尾花的4个特征,最后一部分是鸢尾花的分类。把这里我们用LabeledPoint来存储标签列和特征列。 LabeledPoint在监督学习中常用来存储标签和特征,其中要求标签的类型是double,特征的类型是Vector。这里,先把莺尾花的分类进行变换,”Iris-setosa”对应分类0,”Iris-versicolor”对应分类1,其余对应分类2;然后获取莺尾花的4个特征,存储在Vector中。

 

SparkConf conf = new  SparkConf().setAppName("LogisticRegression").setMaster("local");
JavaSparkContext sc = new  JavaSparkContext(conf);
JavaRDD<String> source =  sc.textFile("data/mllib/iris.data");

JavaRDD<LabeledPoint> data = source.map(line->{
            String[] splits = line.split(",");
            Double label = 0.0;
            if(splits[4].equals("Iris-setosa"))  {
                label = 0.0;
            }else  if(splits[4].equals("Iris-versicolor")) {
                label = 1.0;
            }else {
                label = 2.0;
            }
            return new  LabeledPoint(label,Vectors.dense(Double.parseDouble(splits[0]),
                    Double.parseDouble(splits[1]),
                    Double.parseDouble(splits[2]),
                    Double.parseDouble(splits[3])));
 });

打印数据:

 

// 控制台输出结果:
(0.0,[5.1,3.5,1.4,0.2])
(0.0,[4.9,3.0,1.4,0.2])
(0.0,[4.7,3.2,1.3,0.2])
(0.0,[4.6,3.1,1.5,0.2])
(0.0,[5.0,3.6,1.4,0.2])
(0.0,[5.4,3.9,1.7,0.4])
(0.0,[4.6,3.4,1.4,0.3])
(0.0,[5.0,3.4,1.5,0.2])
(0.0,[4.4,2.9,1.4,0.2])
(0.0,[4.9,3.1,1.5,0.1])
(0.0,[5.4,3.7,1.5,0.2])
... ...
5.2、构建模型:

 

// 首先进行数据集的划分,这里划分60%的训练集和40%的测试集:
JavaRDD<LabeledPoint>[] splits =  data.randomSplit(new double[] {0.6,0.4},11L);
JavaRDD<LabeledPoint> traning =  splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];

构建逻辑斯蒂模型,用set的方法设置参数,比如说分类的数目,这里可以实现多分类逻辑斯蒂模型:

 

LogisticRegressionModel model = new LogisticRegressionWithLBFGS().setNumClasses(3).run(traning.rdd());

输出结果:

 

org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0,  numFeatures = 8, numClasses = 3, threshold = 0.5

接下来,调用分类逻辑斯蒂模型用的predict方法对测试数据进行预测,并把结果保存在MulticlassMetrics中。这里的模型全名为LogisticRegressionWithLBFGS,加上了LBFGS,表示Limited-memory BFGS。其中,BFGS是求解非线性优化问题(L(w)求极大值)的方法,是一种秩-2更新,以其发明者broyden, Fletcher, Goldfarb和Shanno的姓氏首字母命名。

 

JavaPairRDD<Object,Object> predictionAndLables =  test.mapToPair(p->
            new  Tuple2<>(model.predict(p.features()),p.label())
);

这里,采用了test部分的数据每一行都分为标签label和特征features,然后利用map方法,对每一行的数据进行model.predict(features)操作,获得预测值。并把预测值和真正的标签放到predictionAndLabels中。我们可以打印出具体的结果数据来看一下:

 

(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(2.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(1.0,1.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(1.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)
(2.0,2.0)

可以看出,大部分的预测是对的。其中(2.0,1.0),(1.0,2.0)的预测与实际标签不同。

5.3、模型评估

模型预测的准确性打印:

 

//准确性打印:
metrics:0.9615384615384616

五、spark MlLib之支持向量机SVM

img

分类

分类旨在将项目分为不同类别。 最常见的分类类型是二元分类,其中有两类,通常分别为正数和负数。 如果有两个以上的类别,则称为多类分类。 spark.mllib支持两种线性分类方法:线性支持向量机(SVM)和逻辑回归。 线性SVM仅支持二进制分类,而逻辑回归支持二进制和多类分类问题。 对于这两种方法,spark.mllib支持L1和L2正则化变体。 训练数据集由MLlib中LabeledPoint的RDD表示,其中标签是从零开始的类索引:0,1,2,....

一、基本思想

统计学习理论是在传统统计学基础上发展起来的一种机器学习方法 。SVM 的基本思想可由图 1说明 ,在二维两类线性可分情况下,有很多可能的线性分类器可以把这组数据分割开,但是只有一个使两类的分类间隔 margin最大,即图中的 H,这个线性分类器就是最优分类超平面,与其它分类器相比 ,具有更好的泛化性 。

img

最优分类超平面

二、计算公式

假设超平面可描述为:

 

img

假设超平面公式

 

线性SVM是大规模分类任务的标准方法。 其学习策略是使数据间的间隔最大化,最终可转化为一个凸二次规划问题的求解。

分类器的损失函数(hinge loss铰链损失):

L(w;x,y):=max{0,1−ywTx}.

认情况下,线性SVM使用L2正则化进行训练。 我们还支持替代L1正则化。 在这种情况下,问题变成线性程序。线性SVM算法输出SVM模型。 给定一个新的数据点,用x表示,该模型根据wTx的值进行预测。 认情况下,如果wTx≥0则结果为正,否则为负。

三、代码实现

 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import  org.apache.spark.api.java.JavaSparkContext;
import  org.apache.spark.mllib.classification.SVMModel;
import  org.apache.spark.mllib.classification.SVMWithSGD;
import  org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.linalg.Vectors;
import  org.apache.spark.mllib.regression.LabeledPoint;
3.1、读取数据:

 

SparkConf conf = new  SparkConf().setAppName("SVM").setMaster("local");
JavaSparkContext sc = new  JavaSparkContext(conf);             
JavaRDD<String> source =  sc.textFile("data/mllib/iris.data");   

用LabeledPoint来存储标签列和特征列。 LabeledPoint在监督学习中常用来存储标签和特征,其中要求标签的类型是double,特征的类型是Vector

 

JavaRDD<LabeledPoint> data =  source.map(line->{            
String[] parts = line.split(",");           
 double label = 0.0;           
 if(parts[4].equals("Iris-setosa")) {                
            label = 0.0;            
}else  if(parts[4].equals("Iris-versicolor")) {        
            label = 1.0;           
 }else {              
            label = 2.0;         
}           
 return new  LabeledPoint(label,Vectors.dense(Double.parseDouble(parts[0]),                 
                                              Double.parseDouble(parts[1]),                   
                                              Double.parseDouble(parts[2]),                  
                                              Double.parseDouble(parts[3])));       
 });
3.2、 构建模型

因为SVM只支持2分类,所以我们要进行一下数据抽取,这里我们通过filter过滤掉第2类的数据,只选取第0类和第1类的数据。然后,我们把数据集划分成两部分,其中训练集占60%,测试集占40%

 

JavaRDD<LabeledPoint>[] filters =  data.filter(line->{ 
        return line.label()!=2; 
}).randomSplit(new  double[]{0.6,0.4},11L);
JavaRDD<LabeledPoint> training =  filters[0].cache();
JavaRDD<LabeledPoint> test = filters[1]; 

接下来,通过训练集构建模型SVMWithSGD。这里的SGD即著名的随机梯度下降算法(stochastic Gradient Descent)。设置迭代次数为1000,除此之外还有stepSize(迭代步伐大小),regParam(regularization正则化控制参数),miniBatchFraction(每次迭代参与计算的样本比例),initialWeights(weight向量初始值)等参数可以进行设置。

 

//构建训练集 SVMWithSGD  
// SGD即著名的随机梯度下降算法(stochastic  Gradient Descent)  
// 设置迭代次数为1000,  
// 除此之外还有stepSize(迭代步伐大小),   
// regParam(regularization正则化控制参数),  
// miniBatchFraction(每次迭代参与计算的样本比例),  
//initialWeights(weight向量初始值)等参数可以进行设置*/
SVMModel model =  SVMWithSGD.train(training.rdd(), 1000); 
3.3、 模型评估

 

//清除认阈值,这样会输出原始的预测评分,即带有确信度的结果
model.clearThreshold();   
JavaRDD<Tuple2<Object,Object>>  scoreAndLabels = test.map(point->   
    new  Tuple2<>(model.predict(point.features()),point.label()));
    scoreAndLabels.foreach(x->{          
    System.out.println(x);
});

//输出结果:
(-2.627551665051128,0.0)(-2.145161194882099,0.0)
(-2.3068829871403618,0.0)(-3.0554378212130096,0.0)
(-2.3698036980710446,0.0)(-2.335545287277434,0.0)
(-2.6962358412306786,0.0)(-2.8222115665081975,0.0)
(-3.5549967121975508,0.0)(-1.963540537080021,0.0)
(-2.8307953180240637,0.0)(-3.5132621172293095,0.0)
(-3.8139420880575643,0.0)(-2.6303719513181254,0.0)
(-1.4913566958139257,0.0)(-2.5373343352394144,0.0)
(-2.4271282983451896,0.0)(-2.6590342514551977,0.0)
(3.2420043610860385,1.0)(3.5440500131703354,1.0)
(3.067344577412759,1.0)(3.269179005035978,1.0)
(2.141265211522379,1.0)(3.705816267306055,1.0)
(4.418311047904414,1.0)(2.955773777046275,1.0)
(4.117932735084642,1.0)(3.904874870539733,1.0)
(2.061176997559964,1.0)(2.685256091027288,1.0)
(3.210566236559426,1.0)(3.963262576277656,1.0)
(3.299206068645311,1.0)(3.7974891199125067,1.0)...  ...

那如果设置了阈值,则会把大于阈值的结果当成正预测,小于阈值的结果当成负预测。

 

model.setThreshold(0.0); 
scoreAndLabels.foreach(x->{   
     System.out.println(x);
});

输出结果:

 

(0.0,0.0)(0.0,0.0)(0.0,0.0)
(0.0,0.0)(0.0,0.0)(0.0,0.0)
(0.0,0.0)(0.0,0.0)(0.0,0.0)
(0.0,0.0)(0.0,0.0)(0.0,0.0)
(0.0,0.0)(0.0,0.0)(0.0,0.0)
(0.0,0.0)(0.0,0.0)(0.0,0.0)
(1.0,1.0)(1.0,1.0)(1.0,1.0)
(1.0,1.0)(1.0,1.0)(1.0,1.0)
(1.0,1.0)(1.0,1.0)...  ...

构建评估矩阵,把模型预测的准确性打印出来:

 

BinaryClassificationMetrics metrics = new  BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
System.out.println("Area under ROC =  "+metrics.areaUnderROC());
//结果打印:
Area under ROC = 1.0

其中, SVMWithSGD.train() 方法认的通过把正则化参数设为1来执行来范数。如果我们想配置这个算法,可以通过创建一个新的 SVMWithSGD对象然后调用他的setter方法来进行重新配置。下面这个例子,我们构建了一个正则化参数为0.1的L1正则化SVM方法 ,然后迭代这个训练算法2000次。

 

SVMWithSGD sgd = new SVMWithSGD();
sgd.optimizer().setRegParam(0.1).setNumIterations(2000).setUpdater(new L1Updater());
SVMModel modelL1 =  sgd.run(training.rdd());
System.out.println("modelL1:"+modelL1);
//打印结果:
modelL1:org.apache.spark.mllib.classification.SVMModel:
intercept = 0.0,  numFeatures = 4, numClasses = 2, threshold = 0.0

模型保存和加载:

 

model.save(sc.sc(), "data/mllib/FiveSVM");
SVMModel sameModel =  SVMModel.load(sc.sc(), "data/mllib/FiveSVM"); 
四、性质

稳健性与稀疏性:SVM的优化问题同时考虑了经验风险和结构风险最小化,因此具有稳定性。从几何观点,SVM的稳定性体现在其构建超平面决策边界时要求边距最大,因此间隔边界之间有充裕的空间包容测试样本 。SVM使用铰链损失函数作为代理损失,铰链损失函数的取值特点使SVM具有稀疏性,即其决策边界仅由支持向量决定,其余的样本点不参与经验风险最小化 。在使用核方法的非线性学习中,SVM的稳健性和稀疏性在确保了可靠求解结果的同时降低了核矩阵的计算量和内存开销。

五、应用

SVM在各领域的模式识别问题中有广泛应用,包括人像识别(face recognition) 、文本分类(text categorization) 、笔迹识别(handwriting recognition) 、生物信息学 等。

六、spark MlLib之决策树

img

一、概念

决策树及其集合是分类和回归的机器学习任务的流行方法。 决策树被广泛使用,因为它们易于解释,处理分类特征,扩展到多类分类设置,不需要特征缩放,并且能够捕获非线性和特征交互。 诸如随机森林和增强的树集合算法是分类和回归任务的最佳表现者。

决策树(decision tree)是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用决策树模型进行分类

二、基本原理

决策树学习通常包含三个方面:特征选择、决策树生成和决策树剪枝。决策树学习思想主要来源于:Quinlan在1986年提出的ID算法、在1993年提出的C4.5算法和Breiman等人在1984年提出的CART算法。

2.1、特征选择

特征选择在于选取对训练数据具有分类能力的特征,这样可以提高决策树学习的效率。通常特征选择的准则是信息增益(或信息增益比、基尼指数等),每次计算每个特征的信息增益,并比较它们的大小,选择信息增益最大(信息增益比最大、基尼指数最小)的特征。

那么问题来了:怎么找到这样的最优划分特征呢?如何来衡量最优?

什么是最优特征,通俗的理解是对训练数据具有很强的分类能力的特征,比如要看相亲的男女是否合适,他们的年龄差这个特征就远比他们的出生地重要,因为年龄差能更好得对相亲是否成功这个分类问题具有更强的分类能力。

但是计算机并不知道哪些特征是最优的,因此,就要找一个衡量特征是不是最优的指标,使得决策树在每一个分支上的数据尽可能属于同一类别的数据,即样本纯度最高。

我们用熵来衡量样本集合的纯度。

这是概率统计与信息论中的一个概念,定义为:

 

img

其中p(x)=pi表示随机变量X发生概率。

我们可以从两个角度理解这个概念。

第一就是不确定度的一个度量,我们的目标是为了找到一颗树,使得每个分枝上都代表一个分类,也就是说我们希望这个分枝上的不确定性最小,即确定性最大,也就是这些数据都是同一个类别的。熵越小,代表这些数据是同一类别的越多。

第二个角度就是从纯度理解。因为熵是不确定度的度量,如果他们不确定度越小,意味着这个群体的差异很小,也就是它的纯度很高。比如,在明大的某富翁聚会上,来的人大多是某总,普通工薪白领就会很少,如果新来了一个刘总,他是富翁的确定性就很大,不确定性就很小,同时这个群体的纯度很大。总结来说就是熵越小,纯度越大,而我们希望的就是纯度越大越好。

信息增益

我们用信息熵来衡量一个分支的纯度,以及哪个特征是最优的特征 在决策树学习中应用信息增益准则来选择最优特征。信息增益定义如下:

 

img

信息增益

 

特征A对训练数据集D的信息增益g(D,A) 等于D的不确定度H(D) 减去给定条件A下D的不确定度H(D|A),可以理解为由于特征A使得对数据集D的分类的不确定性减少的程度,信息增益大的特征具有更强的分类能力。

信息增益率

信息增益选择特征倾向于选择取值较多的特征,假设某个属性存在大量的不同值,决策树在选择属性时,将偏向于选择该属性,但这肯定是不正确(导致过拟合)的。因此有必要使用一种更好的方法,那就是信息增益率(Info Gain Ratio)来矫正这一问题。 其公式为:

 

img

信息增益率

 

 

其中

img

训练数据集D关于特征A的值的熵

,n为特征A取值的个数

概率分布的基尼指数定义为

 

img

基尼指数

 

其中K表示分类问题中类别的个数

2.2、决策树的生成

从根结点开始,对结点计算所有可能的特征的信息增益,选择信息增益最大的特征作为结点的特征,由该特征的不同取值建立子结点,再对子结点递归地调用以上方法,构建决策树;直到所有特征的信息增均很小或没有特征可以选择为止,最后得到一个决策树。

决策树需要有停止条件来终止其生长的过程。一般来说最低的条件是:当该节点下面的所有记录都属于同一类,或者当所有的记录属性都具有相同的值时。这两种条件是停止决策树的必要条件,也是最低的条件。在实际运用中一般希望决策树提前停止生长,限定叶节点包含的最低数据量,以防止由于过度生长造成的过拟合问题。

2.3、决策树的剪枝

决策树生成只考虑了通过信息增益或信息增益比来对训练数据更好的拟合,但没有考虑到如果模型过于复杂,会导致过拟合的产生。而剪枝就是缓解过拟合的一种手段,单纯的决策树生成学习局部的模型,而剪枝后的决策树会生成学习整体的模型,因为剪枝的过程中,通过最小化损失函数,可以平衡决策树的对训练数据的拟合程度和整个模型的复杂度。

决策树的损失函数定义如下:

 

img

损失函数

 

 

其中,

img

图1

三、代码实现

我们以iris数据集(https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data)为例进行分析。iris以鸢尾花的特征作为数据来源,数据集包含150个数据集,分为3类,每类50个数据,每个数据包含4个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。

3.1、读取数据

首先,读取文本文件;然后,通过map将每行的数据用“,”隔开,在我们的数据集中,每行被分成了5部分,前4部分是鸢尾花的4个特征,最后一部分是鸢尾花的分类。把这里我们用LabeledPoint来存储标签列和特征列。LabeledPoint在监督学习中常用来存储标签和特征,其中要求标签的类型是double,特征的类型是Vector。所以,我们把莺尾花的分类进行了一下改变,”Iris-setosa”对应分类0,”Iris-versicolor”对应分类1,其余对应分类2;然后获取莺尾花的4个特征,存储在Vector中。

 

import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import scala.Tuple2;
SparkConf conf = new  SparkConf().setAppName("decisionTree").setMaster("local");
JavaSparkContext sc = new  JavaSparkContext(conf);
        
/**
   * 读取数据
   * 转化成 LabeledPoint类型
*/
JavaRDD<String> source =  sc.textFile("data/mllib/iris.data");
JavaRDD<LabeledPoint> data =  source.map(line->{
    String[] parts = line.split(",");
    double label = 0.0;
    if(parts[4].equals("Iris-setosa")) {
                label = 0.0;
     }else  if(parts[4].equals("Iris-versicolor")) {
                label = 1.0;
     }else {
                label = 2.0;
     }
     return new  LabeledPoint(label,Vectors.dense(Double.parseDouble(parts[0]),
                              Double.parseDouble(parts[1]),
                              Double.parseDouble(parts[2]),
                              Double.parseDouble(parts[3])));
});
3.2、划分数据集

接下来,首先进行数据集的划分,这里划分70%的训练集和30%的测试集:

 

JavaRDD<LabeledPoint>[] splits =  data.randomSplit(new double[] {0.7,0.3});
JavaRDD<LabeledPoint> trainingData =  splits[0];
JavaRDD<LabeledPoint> testData =  splits[1];
3.3、构建模型

调用决策树的trainClassifier方法构建决策树模型,设置参数,比如分类数、信息增益的选择、树的最大深度等:

 

int numClasses = 3;//分类数
int maxDepth = 5; //树的最大深度
int maxBins = 30;//离散连续特征时使用的bin数。增加maxBins允许算法考虑更多的分割候选者并进行细粒度的分割决策。
String impurity = "gini";
Map<Integer,Integer>  categoricalFeaturesInfo = new  HashMap<Integer,Integer>();//空的categoricalFeaturesInfo表示所有功能都是连续的。
DecisionTreeModel model =  DecisionTree.trainClassifier(trainingData,  numClasses, categoricalFeaturesInfo, impurity,  maxDepth, maxBins);
3.4、模型预测

接下来我们调用决策树模型的predict方法对测试数据集进行预测,并把模型结构打印出来:

 

JavaPairRDD<Double, Double> predictionAndLabel =  testData.mapToPair(point->{
      return new Tuple2<>(model.predict(point.features()),point.label());
});
//打印预测和实际结果
predictionAndLabel.foreach(x->{
      System.out.println("predictionAndLabel:"+x);
});
System.out.println("Learned  classification tree  model:"+model.toDebugString());
/**
*控制台输出结果:
-----------------------
Learned classification tree model:DecisionTreeModel classifier of depth 5 with 15  nodes
  If (feature 2 <= 2.45)
   Predict: 0.0
  Else (feature 2 > 2.45)
   If (feature 2 <= 4.75)
    Predict: 1.0
   Else (feature 2 > 4.75)
    If (feature 2 <= 4.95)
     If (feature 0 <= 6.25)
      If (feature 1 <= 3.05)
       Predict: 2.0
      Else (feature 1 > 3.05)
       Predict: 1.0
     Else (feature 0 > 6.25)
      Predict: 1.0
    Else (feature 2 > 4.95)
     If (feature 3 <= 1.7000000000000002)
      If (feature 0 <= 6.05)
       Predict: 1.0
      Else (feature 0 > 6.05)
       Predict: 2.0
     Else (feature 3 > 1.7000000000000002)
      Predict: 2.0
------------------------
**/
3.5、准确性评估

最后,我们把模型预测的准确性打印出来:

 

double testErr = predictionAndLabel.filter(pl ->  !pl._1().equals(pl._2())).count() / (double)  testData.count();
System.out.println("Test  Error:"+testErr);
/**
*控制台输出结果:
------------------------------
Test Error:0.06976744186046512
------------------------------
**/

七、spark MlLib之奇异值分解-SVD

img

奇异值分解-SVD

【版权声明】本文为原创,转载请注明原地址 http://www.wangpengcufe.com/machinelearning/ml-ml7/

降维(Dimensionality Reduction) 是机器学习中的一种重要的特征处理手段,它可以减少计算过程中考虑到的随机变量(即特征)的个数,其被广泛应用于各种机器学习问题中,用于消除噪声、对抗数据稀疏问题。它在尽可能维持原始数据的内在结构的前提下,得到一组描述原数据的,低维度的隐式特征(或称主要特征)。简单来说,在高维数据中筛选出对我们有用的变量,减小计算复杂度提高模型训练效率和准确率,这就是我们要说的降维。

MLlib机器学习库提供了两个常用的降维方法奇异值分解(Singular Value Decomposition,SVD)主成分分析(Principal Component Analysis,PCA),下面我们将通过实例介绍其具体的使用方法

一、公式和原理

奇异值分解(SVD)将矩阵A分解为三个矩阵:U,Σ和V,如下公式

 

 

公式:

img

奇异值分解公式

其中

左奇异矩阵 :,U一个标准正交矩阵,也叫实对称矩阵,怎么理解这个概念呢?就是说矩阵A的转置等于其本身,或者说矩阵U的维度为m×m ,用符号表示为

img

矩阵U

,我们称U 为。

奇异值 : Σ一个对角矩阵,仅在主对角线上有值,其它元素均为0,用符合表示为

img

奇异值

 

,我们称Σ为奇异值

右奇异矩阵V也是一个正交矩阵,这会儿知道是啥意思了吧,和U的解释一样,用符号表示为

img

图5

 

,我们称 V右奇异矩阵

奇异值分解 : 就是想要找到一个比较小的值k,保留前k个奇异向量和奇异值,其中 U 的维度从 m×m 变成了 m×k , V 的维度从 n×n 变成了 m×k ,Σ 的维度从 m×n 变成了 k×k 的方阵,从而达到降维效果

二、代码实现

Mllib内置的奇异值分解功能位于org.apache.spark.mllib.linalg包下的RowMatrix和IndexedRowMatrix类中,所以,我们必须先通过已有数据创建出相应矩阵类型的对象,然后调用该类的成员方法来进行SVD分解,这里以RowMatrix为例:

 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
2.1、准备一个矩阵

准备好一个矩阵,这里我们采用一个简单的文件a.mat来存储一个尺寸为(4,9)的矩阵,其内容如下:

 

1 2 3 4 5 6 7 8 9 
5 6 7 8 9 0 8 6 7 
9 0 8 7 1 4 3 2 1 
6 4 2 1 3 4 2 1 5
2.2、computeSVD方法计算分解结果

随后,将该文本文件读入成RDD[Vector],并转换成RowMatrix,即可调用RowMatrix自带的computeSVD方法计算分解结果,这一结果保存在类型为SingularValueDecomposition的svd对象中

 

SparkConf conf = new SparkConf().setAppName("SVD").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
        
JavaRDD<String> source = sc.textFile("data/mllib/a.data");
JavaRDD<Vector> data = source.map(line->{
    String[] parts = line.split(" ");
    return  Vectors.dense(Double.parseDouble(parts[0]),
            Double.parseDouble(parts[1]),
            Double.parseDouble(parts[2]));
});
RowMatrix rm = new RowMatrix(data.rdd());
2.3、保留前3个奇异值

 

SingularValueDecomposition<RowMatrix,Matrix> svd = rm.computeSVD(3,false,1.0E-9d);
2.4、得到V、s、U成员

通过访问svd对象的V、s、U成员分别拿到进行SVD分解后的右奇异矩阵、奇异值向量和左奇异矩阵:

 

System.out.println("s=================");
System.out.println(svd.s());
System.out.println("s=================");

System.out.println("V=================");
System.out.println(svd.V());
System.out.println("V=================");

System.out.println("U=================");
System.out.println(svd.U());
System.out.println("U=================");

控制台输出结果:

 

s========================================================
[28.741265581939565,10.847941223452608,7.089519467626695]
s========================================================

V===============================================================
-0.32908987300830383  0.6309429972945555    0.16077051991193514   
-0.2208243332000108   -0.1315794105679425   -0.2368641953308101   
-0.35540818799208057  0.39958899365222394   -0.147099615168733    
-0.37221718676772064  0.2541945113699779    -0.25918656625268804  
-0.3499773046239524   -0.24670052066546988  -0.34607608172732196  
-0.21080978995485605  0.036424486072344636  0.7867152486535043    
-0.38111806017302313  -0.1925222521055529   -0.09403561250768909  
-0.32751631238613577  -0.3056795887065441   0.09922623079118417   
-0.3982876638452927   -0.40941282445850646  0.26805622896042314 
V===============================================================
  
U====
null
U====

这里可以看到,由于限定了取前三个奇异值,所以奇异值向量s包含有三个从大到小排列的奇异值,而右奇异矩阵V中的每一列都代表了对应的右奇异向量。 U成员得到的是一个null值,这是因为在实际运用中,只需要V和S两个成员,即可通过矩阵计算达到降维的效果

如果需要获得U成员,可以在进行SVD分解时,指定computeU参数,令其等于True,即可在分解后的svd对象中拿到U成员,如下文所示:

 

SingularValueDecomposition<RowMatrix,  Matrix> svd = rm.computeSVD(3,true,1.0E-9d);
System.out.println("U=================");
System.out.println(svd.U());
System.out.println("U=================");

控制台输出结果:

 

U============================================================
org.apache.spark.mllib.linalg.distributed.RowMatrix@25c2a9e3
U============================================================
三、优缺点

降维的好处:减小数据维度和需要的存储空间,节约模型训练计算时间,去掉冗余变量,提高算法的准确度,有利于数据可视化。简化数据,去除噪声点,提高算法的结果。

缺点:数据的转换可能难以理解;

适用数据类型:数值型。

四、 应用

通过SVD对数据的处理,我们可以使用小得多的数据集来表示原始数据集,这样做实际上是去除了噪声和冗余信息,以此达到了优化数据、提高结果的目的。有以下应用:

隐形语义索引:最早的SVD应用之一就是信息检索,我们称利用SVD的方法为隐性语义检索(LSI)或隐形语义分析(LSA)

推荐系统:SVD的另一个应用就是推荐系统,较为先进的推荐系统先利用SVD从数据中构建一个主题空间,然后再在该空间下计算相似度,以此提高推荐的效果

八、spark MlLib主成分分析(PCA)

img

一、概念

主成分分析(Principal Component Analysis)是指将多个变量通过线性变换以选出较少数重要变量的一种多元统计分析方法,又称为主成分分析。在实际应用场合中,为了全面分析问题,往往提出很多与此有关的变量(或因素),因为每个变量都在不同程度上反映这个应用场合的某些信息。

主成分分析是设法将原来众多具有一定相关性(比如N个指标)的指标,重新组合成一组新的相互无关的综合指标来代替原来的指标,从而实现数据降维的目的,这也是MLlib的处理手段之一。

二、代码实现

 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.feature.PCA;
import org.apache.spark.mllib.feature.PCAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.rdd.RDD;

SparkConf conf = new  SparkConf().setAppName("PCA").setMaster("local");
JavaSparkContext sc = new  JavaSparkContext(conf);
        
/**
  *  使用test.data矩阵
     1 2 3 4 5 6 7 8 9
     5 6 7 8 9 0 8 6 7
     9 0 8 7 1 4 3 2 1
     6 4 2 1 3 4 2 1 5
  */
JavaRDD<String> source =  sc.textFile("data/mllib/test.data");
JavaRDD<Vector> data =  source.map(line->{
     String[] parts = line.split(" ");
     return  Vectors.dense(Double.parseDouble(parts[0]),
             Double.parseDouble(parts[1]),
             Double.parseDouble(parts[2]),
             Double.parseDouble(parts[3]),
             Double.parseDouble(parts[4]),
             Double.parseDouble(parts[5]),
             Double.parseDouble(parts[6]),
             Double.parseDouble(parts[7]),
             Double.parseDouble(parts[8]));
});
data.foreach(x->{
     System.out.println(x);
});

控制台输出结果:

 

[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0]
[5.0,6.0,7.0,8.0,9.0,0.0,8.0,6.0,7.0]
[9.0,0.0,8.0,7.0,1.0,4.0,3.0,2.0,1.0]
[6.0,4.0,2.0,1.0,3.0,4.0,2.0,1.0,5.0]

 

RowMatrix rm = new  RowMatrix(data.rdd());
Matrix pc =  rm.computePrincipalComponents(3);
System.out.println(pc);

控制台输出结果:

 

-0.41267731212833847   -0.3096216957951525    0.1822187433607524    
0.22357946922702987    -0.08150768817940773   0.5905947537762997    
-0.08813803143909382   -0.5339474873283436    -0.2258410886711858   
0.07580492185074224   -0.56869017430423       -0.28981327663106565  
0.4399389896865264     -0.23105821586820194   0.3185548657550075    
-0.08276152212493619  0.3798283369681188      -0.4216195003799105   
0.3952116027336311     -0.19598446496556066   -0.17237034054712738  
0.43580231831608096    -0.023441639969444372  -0.4151661847170216   
0.468703853681766     0.2288352748369381      0.04103087747663084

可以看到,主成分矩阵是一个尺寸为(9,3)的矩阵,其中每一列代表一个主成分(新坐标轴),每一行代表原有的一个特征,而a.data矩阵可以看成是一个有4个样本,9个特征的数据集,那么,主成分矩阵相当于把原有的9维特征空间投影到一个3维的空间中,从而达到降维的效果

 

RowMatrix rm2 = rm.multiply(pc);
RDD<Vector> v = rm2.rows();
JavaRDD<Vector> vector = v.toJavaRDD();
vector.foreach(x->{
   System.out.println(x);
});

控制台输出结果:

 

[12.247647483894383,-2.725468189870252,-5.568954759405281]
[12.284448024169402,-12.510510992280857,-0.16048149283293078]
[-1.2537294080109986,-10.15675264890709,-4.8697886049036025]
[2.8762985358626505,-2.2654415718974685,1.428630138613534]

MLlib提供的PCA变换方法最多只能处理65535维的数据。

九、spark MlLib之协同过滤算法

img

一、概念

协同过滤算法主要分为基于用户的协同过滤算法和基于项目的协同过滤算法。

img

基于用户的协同过滤算法和基于项目的协同过滤算法

1.1、以用户为基础(User-based)的协同过滤

用相似统计方法得到具有相似爱好或者兴趣的相邻用户,所以称之为以用户为基础(User-based)的协同过滤或基于邻居的协同过滤(Neighbor-based Collaborative Filtering)。

具体步骤为:

1.收集用户信息 收集可以代表用户兴趣的信息。一般的网站系统使用评分的方式或是给予评价,这种方式被称为“主动评分”。另外一种是“被动评分”,是根据用户的行为模式由系统代替用户完成评价,不需要用户直接打分或输入评价数据。电子商务网站在被动评分的数据获取上有其优势,用户购买的商品记录是相当有用的数据。

2.最近邻搜索(Nearest neighbor search, NNS) 以用户为基础(User-based)的协同过滤的出发点是与用户兴趣爱好相同的另一组用户,就是计算两个用户的相似度。例如:查找n个和A有相似兴趣用户,把他们对M的评分作为A对M的评分预测。一般会根据数据的不同选择不同的算法,较多使用的相似度算法有Pearson Correlation Coefficient、Cosine-based Similarity、Adjusted Cosine Similarity。

3.产生推荐结果 有了最近邻集合,就可以对目标用户的兴趣进行预测,产生推荐结果。依据推荐目的的不同进行不同形式的推荐,较常见的推荐结果有Top-N 推荐和关系推荐。Top-N 推荐是针对个体用户产生,对每个人产生不一样的结果,例如:通过对A用户的最近邻用户进行统计,选择出现频率高且在A用户的评分项目中不存在的,作为推荐结果。关系推荐是对最近邻用户的记录进行关系规则(association rules)挖掘。

1.2、以项目为基础(Item-based)的协同过滤

用户为基础的协同推荐算法随着用户数量的增多,计算的时间就会变长,所以在2001年Sarwar提出了基于项目的协同过滤推荐算法(Item-based Collaborative Filtering Algorithms)。以项目为基础的协同过滤方法一个基本的假设“能够引起用户兴趣的项目,必定与其之前评分高的项目相似”,通过计算项目之间的相似性来代替用户间的相似性。

具体步骤为:

1.收集用户信息 同以用户为基础(User-based)的协同过滤。

2.针对项目的最近邻搜索 先计算已评价项目和待预测项目的相似度,并以相似度作为权重,加权各已评价项目的分数,得到待预测项目的预测值。例如:要对项目 A 和项目 B 进行相似性计算,要先找出同时对 A 和 B 打过分的组合,对这些组合进行相似度计算,常用的算法同以用户为基础(User-based)的协同过滤。

3.产生推荐结果 以项目为基础的协同过滤不用考虑用户间的差别,所以精度比较差。但是却不需要用户的历史数据,或是进行用户识别。对于项目来讲,它们之间的相似性要稳定很多,因此可以离线完成工作量最大的相似性计算步骤,从而降低了在线计算量,提高推荐效率,尤其是在用户多于项目的情形下尤为显著。

二、隐式反馈 VS 显性反馈
2.1、概念

隐性反馈行为:不能明确反映用户喜好的行为。

显性反馈行为用户明确表示对物品喜好的行为。

显性反馈行为包括用户明确表示对物品喜好的行为,隐性反馈行为指的是那些不能明确反应用户喜好的行为。在许多的现实生活中的很多场景中,我们常常只能接触到隐性的反馈,例如页面游览,点击,购买,喜欢,分享等等。 基于矩阵分解的协同过滤的标准方法,一般将用户商品矩阵中的元素作为用户对商品的显性偏好。在 MLlib 中所用到的处理这种数据的方法来源于文献: Collaborative Filtering for Implicit Feedback Datasets 。 本质上,这个方法将数据作为二元偏好值和偏好强度的一个结合,而不是对评分矩阵直接进行建模。因此,评价就不是与用户对商品的显性评分,而是与所观察到的用户偏好强度关联起来。然后,这个模型将尝试找到隐语义因子来预估一个用户一个商品的偏好。

2.2、显性反馈数据和隐形反馈数据的比较

img

显性反馈数据和隐形反馈数据的比较

2.3、各代表网站中显性反馈数据和隐性反馈数据的例子

img

各代表网站中显性反馈数据和隐性反馈数据的例子

三、代码实现

下面代码读取spark的示例文件文件中每一行包括一个用户id、商品id和评分。我们使用认的ALS.train() 方法来构建推荐模型并评估模型的均方差。

3.1、导入需要的包:

 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.rating;
3.2、读取数据:

首先,读取文本文件,把数据转化成rating类型,即[Int, Int, Double]的RDD;

 

/**
  * 1、获取spark
*/
SparkConf conf = new  SparkConf().setAppName("CollaborativeModel").setMaster("local");
JavaSparkContext sc = new  JavaSparkContext(conf);
/**
* 2、读取文本文件,把数据转化成rating类型,即[Int, Int, Double]的RDD
*/
JavaRDD<String> source =  sc.textFile("data/mllib/collaborative.data");
JavaRDD<rating> ratings =  source.map(line->{
    String[] parts = line.split(",");
    return new  rating(Integer.parseInt(parts[0]),
            Integer.parseInt(parts[1]),
            Double.parseDouble(parts[2]));
});
ratings.foreach(x->{
    System.out.println(x);
});
/**
*控制台输出结果:
*rating中的第一个int是user编号,第二个int是item编号,最后的double是user对item的评分。
----------------
rating(1,1,5.0)
rating(3,2,5.0)
rating(1,2,1.0)
rating(3,3,1.0)
rating(1,3,5.0)
rating(3,4,5.0)
rating(1,4,1.0)
rating(4,1,1.0)
rating(2,1,5.0)
rating(4,2,5.0)
rating(2,2,1.0)
rating(4,3,1.0)
rating(2,3,5.0)
rating(4,4,5.0)
rating(2,4,1.0)
rating(3,1,1.0)
----------------
**/
3.3、构建模型

划分训练集和测试集,比例分别是0.8和0.2。

 

JavaRDD<rating>[] splits =  ratings.randomSplit(new double[] {0.8,0.2});
JavaRDD<rating> training = splits[0];
JavaRDD<rating> test = splits[1];

指定参数值,然后使用ALS训练数据建立推荐模型:

 

int rank = 10;
int numIterations = 10;
/**
* 可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优
*/
MatrixFactorizationModel model = ALS.train(training.rdd(),rank,numIterations,0.01);

在 MLlib 中的实现有如下的参数:

numBlocks 是用于并行化计算的分块个数 (设置为-1,为自动配置)。

rank 是模型中隐语义因子的个数。

iterations 是迭代的次数

lambda 是ALS的正则化参数。

implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。

alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。

可以调整这些参数,不断优化结果,使均方差变小。比如:iterations越多,lambda较小,均方差会较小,推荐结果较优。上面的例子中调用了 ALS.train(ratings, rank, numIterations, 0.01) ,我们还可以设置其他参数,调用方式如下:

 

MatrixFactorizationModel model2 = new  ALS().setRank(10)
                                            .setIterations(2000)
                                            .setLambda(0.01)
                                            .setImplicitPrefs(true)
                                            .setUserBlocks(10)
                                            .setProductBlocks(10)
                                            .run(training);
3.4、 利用模型进行预测

从 test训练集中获得只包含用户和商品的数据集 :

 

JavaRDD<Tuple2<Object, Object>>  testUsersProducts = test.map(line->{
            return new  Tuple2<>(line.user(),line.product());
});

使用训练好的推荐模型对用户商品进行预测评分,得到预测评分的数据集:

 

JavaPairRDD<Tuple2<Integer, Integer>, Double>  predictions =JavaPairRDD.fromJavaRDD(
          model.predict(testUsersProducts.rdd()).toJavaRDD().map(r->
                    new Tuple2<>(new  Tuple2<>(r.user(),r.product()),r.rating())));

将真实评分数据集与预测评分数据集进行合并。这里,Join操作类似于sql的inner join操作,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的。

 

JavaPairRDD<Tuple2<Integer, Integer>, Double>  predictions = JavaPairRDD.fromJavaRDD(
                model.predict(testUsersProducts.rdd()).toJavaRDD().map(r->
                    new Tuple2<>(new  Tuple2<>(r.user(),r.product()),r.rating())));
System.out.println(predictions);
predictions.foreach(x->{
    System.out.println(x);
});
/**
*控制台输出结果:
*rating中的第一个int是user编号,第二个int是item编号,最后的double是user对item的评分。
----------------------------
((4,2),0.010526887751696495)
((1,1),3.6826782499237716)
((1,2),0.7464017268228036)
((3,2),0.010526887751696495)
----------------------------
**/

我们把结果输出,对比一下真实结果与预测结果:

 

JavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(test.map(x->new Tuple2<>(
        new Tuple2<>(x.user(),x.product()),x.rating()))).join(predictions);

System.out.println("------------------------------------------");
ratesAndPreds.foreach(x->{
    System.out.println(x);
});
/**
*控制台输出结果:
*比如,第二条结果记录((4,2),(5.0,0.010526887751696495))中,(4,2)分别表示4号用户和2号商品,而5.0是实际的估计分值,
*0.010526887751696495是经过推荐的预测分值。
-----------------------------------
((1,1),(5.0,3.6826782499237716))
((4,2),(5.0,0.010526887751696495))
((1,2),(1.0,0.7464017268228036))
((3,2),(5.0,0.010526887751696495))
-----------------------------------
**/

比如,第二条结果记录((4,2),(5.0,0.010526887751696495))中,(4,2)分别表示4号用户和2号商品,而5.0是实际的估计分值,0.010526887751696495是经过推荐的预测分值。 然后计算均方差,这里的r1就是真实结果,r2就是预测结果:

 

double MSE = ratesAndPreds.values().mapTodouble(x->{
    double err = x._1() - x._2();
    return err * err;
}).mean();
System.out.println("Mean Squared Error = "+MSE);
/**
*控制台输出结果:
-----------------------------------
Mean Squared Error = 2.8413113799948704
-----------------------------------
**/

我们可以看到打分的均方差值为1.09左右。由于本例的数据量很少,预测的结果和实际相比有一定的差距。上面的例子只是对测试集进行了评分,我们还可以进一步的通过调用model.recommendProducts给特定的用户推荐商品以及model.recommendUsers来给特定商品推荐潜在用户

十、spark mlLib之 聚类算法

img

一、概念
1.1、定义

按照某一个特定的标准(比如距离),把一个数据集分割成不同的类或簇,使得同一个簇内的数据对象的相似性尽可能大,同时不再同一个簇内的数据对象的差异性也尽可能的大。

聚类属于典型的无监督学习(Unsupervised Learning) 方法。与监督学习(如分类器)相比,无监督学习的训练集没有人为标注的结果。在非监督式学习中,数据并不被特别标识,学习模型是为了推断出数据的一些内在结构。

1.2、主要方法

层次聚类(Hierarchical Clustering):合并法、分解法、树状图

非层次聚类:划分聚类、谱聚类

1.3、主要特征
  • 类变量的测量尺度不同,需要事先对变量标准化;

  • 类变量中如果有些变量非常相关,意味着这个变量的权重会更大

  • 欧式距离的平方是最常用的距离测量方法

  • 聚类算法要比距离测量方法对聚类结果影响更大;

  • 标准化方法影响聚类模式:

  • 变量标准化倾向产生基于数量的聚类;

  • 样本标准化倾向产生基于模式的聚类;

  • 一般聚类个数在4-6类,不易太多,或太少

二、KMeans原理

KMeans 是一个迭代求解的聚类算法,其属于 划分(Partitioning) 型的聚类方法,即首先创建K个划分,然后迭代地将样本从一个划分转移到另一个划分来改善最终聚类的质量,KMeans 的过程大致如下:

1.根据给定的k值,选取k个样本点作为初始划分中心; 2.计算所有样本点到每一个划分中心的距离,并将所有样本点划分到距离最近的划分中心; 3.计算每个划分中样本点的平均值,将其作为新的中心; 4.用计算出的中心位置重新进行聚类,如此反复循环,直到达到最大迭代次数,或划分中心的变化小于某一预定义阈值

方法的特点:

  • 通常要求已知类别数

  • 可人为指定初始位置

  • 节省运算时间

  • 样本量大于100时有必要考虑

  • 只能使用连续性变量

三、代码实现
3.1、数据集的读取

 

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
//1、获取Spark
SparkConf conf = new SparkConf().setAppName("ClusteringModel").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);


//2、读取数据
JavaRDD<String> rawData = sc.textFile("data/mllib/iris.data");


JavaRDD<Vector> trainingData = rawData.map(line->{
    String[] parts = line.split(",");
    return Vectors.dense(Double.parseDouble(parts[0]),
            Double.parseDouble(parts[1]),
            Double.parseDouble(parts[2]),
            Double.parseDouble(parts[3]));
});

本文使用模式识别领域广泛使用的UCI数据集中的鸢尾花数据Iris进行实验,Iris数据的样本容量为150,有四个实数值的特征,分别代表花朵四个部位的尺寸,以及该样本对应鸢尾花的亚种类型(共有3种亚种类型),如下所示:

 

5.1,3.5,1.4,0.2,setosa
...
5.4,3.0,4.5,1.5,versicolor
...
7.1,3.0,5.9,2.1,virginica
...
3.2、模型训练与分析

可以通过创建一个KMeans类并调用其run(RDD[Vector])方法来训练一个KMeans模型KMeansModel,在该方法调用前需要设置一系列参数,如下表所示:

| 参数 | 含义 | | ——————- | :———————: | | K | 聚类数目,认为2 | | maxIterations | 最大迭代次数认为20 | | initializationMode | 初始化模式,认为”k-means||” | | runs | 运行次数认为:1 | | initializationSteps | 初始化步数,用于KMeans||,认为5 | | epsilon | 迭代停止的阈值,认为1e-4 |

其中,每一个参数均可通过名为setXXX(…)(如maxIterations即为setMaxIterations())的方法进行设置。 由于KMeans类只有无参的构造函数,其对象创建、参数设置需要分别进行,且往往使用的只有存放模型的KMeansModel类对象,花功夫创建出的KMeans类自象本身却并未使用。故MLlib也提供了包装好的高层次方法KMeans.train(…),传入训练样本和相应的参数,即返回一个训练好的KMeansModel对象,十分方便。 该方法有4个重载形式,分别可以指定不同的输入参数,具体可以查阅MLlib的api文档,这里我们使用KMeans.train(data, k, maxIterations, runs)形式,只需要输入k值、最大迭代次数和运行次数,其他参数使用认值,如下所示:

 

KMeansModel model = KMeans.train(trainingData.rdd(),3,100,5);
//通过KMeansModel类自带的clusterCenters属性获取到模型的所有聚类中心情况

这样,模型即创建成功了。可以通过KMeansModel类自带的clusterCenters属性获取到模型的所有聚类中心情况:

 

Vector[] vectors = model.clusterCenters();
for(Vector vector : vectors) {
    System.out.println(vector);
}
/**
*控制台输出结果:
----------------------------------------------------------------------------
[5.901612903225807,2.748387096774194,4.393548387096774,1.4338709677419355]2
[5.005999999999999,3.4180000000000006,1.4640000000000002,0.2439999999999999]
[6.85,3.0736842105263147,5.742105263157893,2.071052631578947]
----------------------------------------------------------------------------
**/

也可以通过predict()方法来确定每个样本所属的聚类:

 

//通过predict()方法来确定每个样本所属的聚类:
trainingData.collect().forEach(sample->{
    int predictedClustre = model.predict(sample);
    System.out.println(sample.toString()+" belongs to cluster "+predictedClustre);
});
/**
*控制台输出结果:
---------------------------------------
[5.1,3.5,1.4,0.2] belongs to cluster 1
[4.9,3.0,1.4,0.2] belongs to cluster 1
[4.7,3.2,1.3,0.2] belongs to cluster 1
[4.6,3.1,1.5,0.2] belongs to cluster 1
[5.0,3.6,1.4,0.2] belongs to cluster 1
[5.4,3.9,1.7,0.4] belongs to cluster 1
[4.6,3.4,1.4,0.3] belongs to cluster 1
 .....
--------------------------------------
**/

同时,KMeansModel类还提供了计算 集合内误差平方和(Within Set Sum of Squared Error, WSSSE) 的方法来度量聚类的有效性:

 

double wssse = model.computeCost(trainingData.rdd());
System.out.println("集合内误差平方和:"+wssse);
/**
*控制台输出结果:
----------------------------------
集合内误差平方和:78.94084142614648
----------------------------------
**/
四、主要应用
4.1、商业

聚类分析被用来发现不同的客户群,并且通过购买模式刻画不同的客户群的特征。 聚类分析是细分市场的有效工具,同时也可用于研究消费者行为,寻找新的潜在市场、选择实验的市场,并作为多元分析的预处理。

4.2、生物

聚类分析被用来动植物分类和对基因进行分类获取对种群固有结构的认识

4.3、地理

聚类能够帮助在地球中被观察的数据库商趋于的相似性

4.4、保险行业

聚类分析通过一个高的平均消费来鉴定汽车保险单持有者的分组,同时根据住宅类型,价值,地理位置来鉴定一个城市的房产分组

4.5、因特网

聚类分析被用来在网上进行文档归类来修复信息

4.6、电子商务

聚类分析在电子商务中网站建设数据挖掘中也是很重要的一个方面,通过分组聚类出具有相似浏览行为的客户,并分析客户的共同特征,可以更好的帮助电子商务的用户了解自己的客户,向客户提供更合适的服务。

十一、spark Mllib之机器学习工作流

img

一、概念

一个典型的机器学习过程从数据收集开始,要经历多个步骤,才能得到需要的输出。这非常类似于流水线式工作,即通常会包含源数据ETL(抽取、转化、加载),数据预处理,指标提取,模型训练与交叉验证,新数据预测等步骤。

MLlib标准化了用于机器学习算法的API,从而使将多种算法组合到单个管道或工作流程中变得更加容易。 本节介绍了Pipelines API引入的关键概念,其中PipeLine(管道)概念主要受scikit-learn项目的启发。

在介绍工作流之前,我们先来了解几个重要概念:

DataFrame:使用Spark sql中的DataFrame作为ML数据集,该数据集可以保存各种数据类型。 例如,DataFrame可以具有不同的列,用于存储文本,特征向量,真实标签和预测。

Transformer:翻译成转换器,是一种算法,可以将一个DataFrame转换为另一个DataFrame。 例如,ML模型是一个Transformer,它将具有特征的DataFrame转换为具有预测的DataFrame。

Estimator:翻译成评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。例如,诸如LogisticRegression之类的学习算法是Estimator,调用fit()可以训练LogisticRegressionModel,后者是Model,因此是Transformer。

Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。

PipeLine:翻译为工作流或者管道。管道将多个“变形器”和“估计器”链接在一起,以指定ML工作流程,并获得结果输出。 例如,简单的文本文档处理工作流程可能包括几个阶段: 1、将每个文档的文本拆分为单词。 2、将每个文档的单词转换成数字特征向量。 3、使用特征向量和标签学习预测模型。 MLlib将这样的工作流表示为“管道”,它由要按特定顺序运行的一系列Pipelinestages(变压器和估计器)组成。

二、工作原理

要构建一个 Pipeline工作流,首先需要定义 Pipeline 中的各个工作流阶段Pipelinestage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织Pipelinestages 并创建一个Pipeline。比如:

 

Pipeline pipeline = new Pipeline().setStages(new  Pipelinestage[]{tokenizer,hashingTF,lr});

然后就可以把训练数据集作为输入参数,调用 Pipeline 实例的 fit 方法来开始以流的方式来处理源训练数据。这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签。更具体的说,工作流的各个阶段按顺序运行,输入的DataFrame在它通过每个阶段时被转换。 对于Transformer阶段,在DataFrame上调用transform()方法。 对于估计器阶段,调用fit()方法生成一个转换器(它成为PipelineModel的一部分或拟合的Pipeline),并且在DataFrame上调用该转换器的transform()方法

 

img

Pipeline1

上面,顶行表示具有三个阶段的流水线。 前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。 底行表示流经管线的数据,其中圆柱表示DataFrames。 在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。 Tokenizer.transform()方法将原始文本文档拆分为单词,向DataFrame添加一个带有单词的新列。 HashingTF.transform()方法将字列转换为特征向量,向这些向量添加一个新列到DataFrame。 现在,由于LogisticRegression是一个Estimator,Pipeline首先调用LogisticRegression.fit()产生一个LogisticRegressionModel。 如果流水线有更多的阶段,则在将DataFrame传递到下一个阶段之前,将在DataFrame上调用LogisticRegressionModel的transform()方法

值得注意的是,工作流本身也可以看做是一个估计器。在工作流的fit()方法运行之后,它产生一个PipelineModel,它是一个Transformer。 这个管道模型将在测试数据的时候使用。 下图说明了这种用法

img

Pipeline2

在上图中,PipelineModel具有与原始流水线相同的级数,但是原始流水线中的所有估计器都变为变换器。 当在测试数据集上调用PipelineModel的transform()方法时,数据按顺序通过拟合的工作流。 每个阶段的transform()方法更新数据集并将其传递到下一个阶段。工作流和工作流模型有助于确保培训和测试数据通过相同的特征处理步骤。

三、代码实现

以逻辑斯蒂回归为例,构建一个典型的机器学习过程,来具体介绍一下工作流是如何应用的。我们的目的是查找出所有包含”spark”的句子,即将包含”spark”的句子的标签设为1,没有”spark”的句子的标签设为0。

3.1、构建训练数据集

 

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.Pipelinestage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
SparkSession spark = SparkSession.builder().appName("MLPipelines").master("local").getorCreate();
//构建训练数据集
List<Row> data = Arrays.asList(RowFactory.create(0L, "a b c d e spark", 1.0),
                               RowFactory.create(1L, "b d", 0.0),
                               RowFactory.create(2L, "spark f g h", 1.0),
                               RowFactory.create(3L, "hadoop mapreduce", 0.0));
System.out.println(data);
/**
*控制台输出结果:
-------------------------------------------------------------------------------------
[[0,a b c d e spark,1.0], [1,b d,0.0], [2,spark f g h,1.0], [3,hadoop mapreduce,0.0]]
-------------------------------------------------------------------------------------
**/
StructType schema = new StructType(new StructField[] {
    new StructField("id",DataTypes.LongType,false,Metadata.empty()),
    new StructField("text", DataTypes.StringType, false, Metadata.empty()),
    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
});
Dataset<Row> training = spark.createDataFrame(data,schema);
training.show(false);
/**
*控制台输出结果:
    +---+----------------+-----+
    |id |text            |label|
    +---+----------------+-----+
    |0  |a b c d e spark |1.0  |
    |1  |b d             |0.0  |
    |2  |spark f g h     |1.0  |
    |3  |hadoop mapreduce|0.0  |
    +---+----------------+-----+
**/
3.2、定义 Pipeline 中的各个工作流阶段Pipelinestage

在这一步中我们要定义 Pipeline 中的各个工作流阶段Pipelinestage,包括转换器和评估器,具体的,包含tokenizer, hashingTF和lr三个步骤。

 

Tokenizer tokenizer = new Tokenizer().setInputCol("text")
                                     .setoutputCol("words");

HashingTF hashingTF = new HashingTF().setNumFeatures(1000)
                                     .setInputCol(tokenizer.getoutputCol())
                                     .setoutputCol("features");

LogisticRegression lr = new LogisticRegression().setMaxIter(10).setRegParam(0.01);
3.3、创建一个Pipeline

有了这些处理特定问题的转换器和评估器,接下来就可以按照具体的处理逻辑有序的组织Pipelinestages 并创建一个Pipeline。

 

Pipeline pipeline = new Pipeline().setStages(new Pipelinestage[]{tokenizer,hashingTF,lr});
3.4、创建模型

现在构建的Pipeline本质上是一个Estimator,在它的fit()方法运行之后,它将产生一个PipelineModel,它是一个Transformer。

 

PipelineModel model = pipeline.fit(training);

我们可以看到,model的类型是一个PipelineModel,这个管道模型将在测试数据的时候使用。所以接下来,我们先构建测试数据。

 

List<Row> testRaw = Arrays.asList(RowFactory.create(4L, "spark i j k"),
        RowFactory.create(5L, "l m n"),
        RowFactory.create(6L, "spark a"),
        RowFactory.create(7L, "apache hadoop")
        );
Dataset<Row> test = spark.createDataFrame(testRaw,schema);
test.select("id", "text").show(false);
/**
*控制台输出结果:
    +---+-------------+
    |id |text         |
    +---+-------------+
    |4  |spark i j k  |
    |5  |l m n        |
    |6  |spark a      |
    |7  |apache hadoop|
    +---+-------------+
**/
3.5、预测结果

然后,我们调用我们训练好的PipelineModel的transform()方法,让测试数据按顺序通过拟合的工作流,生成我们所需要的预测结果。

 

model.transform(test).select("id",  "text", "probability",  "prediction").show(false);
/**
    *控制台输出结果:
   +---+--------------+----------------------------------------+----------+
   |id |text          |probability                             |prediction|
   +---+--------------+----------------------------------------+----------+
   |4  |spark i j k   |[0.540643354485232,0.45935664551476796] |0.0       |
   |5  |l m n         |[0.9334382627383527,0.06656173726164716]|0.0       |
   |6  |spark a       |[0.1504143004807332,0.8495856995192668] |1.0       |
   |7  |apache  hadoop|[0.9768636139518375,0.02313638604816238]|0.0       |
   +---+--------------+----------------------------------------+----------+
**/

通过上述结果,我们可以看到,第4句和第6句中都包含”spark”,其中第六句的预测是1,与我们希望的相符;而第4句虽然预测的依然是0,但是通过概率我们可以看到,第4句有46%的概率预测是1,而第5句、第7句分别只有7%和2%的概率预测为1,这是由于训练数据集较少,如果有更多的测试数据进行学习,预测的准确率将会有显著提升。

十二、spark mlLib之特征提取 TF-IDF

img

一、概念

“词频-逆向文件频率”(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,它可以体现一个文档中词语在语料库中的重要程度。 词语由t表示,文档由d表示,语料库由D表示。词频TF(t,d)是词语t在文档d中出现的次数文件频率DF(t,D)是包含词语的文档的个数。如果我们只使用词频来衡量重要性,很容易过度强调在文档中经常出现,却没有太多实际信息的词语,比如“a”,“the”以及“of”。如果一个词语经常出现在语料库中,意味着它并不能很好的对文档进行区分。TF-IDF就是在数值化文档信息,衡量词语能提供多少信息以区分文档。其定义如下:

img

IDF

此处|D| 是语料库中总的文档数。公式中使用log函数,当词出现在所有文档中时,它的IDF值变为0。加1是为了避免分母为0的情况。TF-IDF 度量值表示如下:

img

TF-IDF

 

在Spark ML库中,TF-IDF被分成两部分:TF (+hashing) 和 IDF。

TF: HashingTF 是一个Transformer,在文本处理中,接收词条的集合然后把这些集合转化成固定长度的特征向量。这个算法在哈希的同时会统计各个词条的词频。

IDF: IDF是一个Estimator,在一个数据集上应用它的fit()方法,产生一个IDFModel。 该IDFModel 接收特征向量(由HashingTF产生),然后计算每一个词在文档中出现的频次。IDF会减少那些在语料库中出现频率较高的词的权重。

Spark.mllib 中实现词频率统计使用特征hash的方式,原始特征通过hash函数,映射到一个索引值。后面只需要统计这些索引值的频率,就可以知道对应词的频率。这种方式避免设计一个全局1对1的词到索引的映射,这个映射在映射大量语料库时需要花费更长的时间。但需要注意,通过hash的方式可能会映射到同一个值的情况,即不同的原始特征通过Hash映射后是同一个值。为了降低这种情况出现的概率,我们只能对特征向量升维。i.e., 提高hash表的桶数,认特征维度是 2^20 = 1,048,576.

在下面的代码段中,我们以一组句子开始。首先使用分解器Tokenizer把句子划分为单个词语。对每一个句子(词袋),我们使用HashingTF将句子转换为特征向量,最后使用IDF重新调整特征向量。这种转换通常可以提高使用文本特征的性能

二、代码实现
2.1、构造文档集合

导入TFIDF所需要的包,创建一个简单的DataFrame,每一个句子代表一个文档。

 

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

//获取spark
SparkSession spark = SparkSession.builder().appName("FeatureExtractors").master("local").getorCreate();

//构造数据
List<Row> rawData = Arrays.asList(RowFactory.create(0, "I heard about Spark and I love Spark"),
        RowFactory.create(0, "I wish Java Could use case classes"),
        RowFactory.create(1, "Logistic regression models are neat")
        );


StructType schema = new StructType(new StructField[] {
        new StructField("label",DataTypes.IntegerType,false,Metadata.empty()),
        new StructField("sentence",DataTypes.StringType,false,Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(rawData,schema);
sentenceData.show(false);

输出结果:

 

+-----+-------------------------------------+
|label|sentence                             |
+-----+-------------------------------------+
|0    |I heard about Spark and I love  Spark|
|0    |I wish Java Could use case  classes  |
|1    |Logistic regression models are  neat |
+-----+-------------------------------------+
2.2、tokenizer对句子进行分词

得到文档集合后,即可用tokenizer对句子进行分词。

 

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setoutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
wordsData.show(false);

输出结果:

 

+-----+------------------------------------+---------------------------------------------+
|label|sentence                            |words                                        |
+-----+------------------------------------+---------------------------------------------+
|0    |I heard about Spark and I love Spark|[i, heard, about, spark, and, i, love, spark]|
|0    |I wish Java Could use case classes  |[i, wish, java, Could, use, case, classes]   |
|1    |Logistic regression models are neat |[logistic, regression, models, are, neat]    |
+-----+------------------------------------+---------------------------------------------+
2.3、TF把句子哈希成特征向量

得到分词后的文档序列后,即可使用HashingTF的transform()方法把句子哈希成特征向量,这里设置哈希表的桶数为2000。

 

HashingTF hashingTF = new HashingTF().setInputCol("words").setoutputCol("rawFeatures").setNumFeatures(2000);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
featurizedData.show(false);

输出结果:

 

+-----+------------------------------------+---------------------------------------------+---------------------------------------------------------------------+
|label|sentence                            |words                                        |rawFeatures                                                          |
+-----+------------------------------------+---------------------------------------------+---------------------------------------------------------------------+
|0    |I heard about Spark and I love Spark|[i, heard, about, spark, and, i, love, spark]|(2000,[240,333,1105,1329,1357,1777],[1.0,1.0,2.0,2.0,1.0,1.0])       |
|0    |I wish Java Could use case classes  |[i, wish, java, Could, use, case, classes]   |(2000,[213,342,489,495,1329,1809,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|1    |Logistic regression models are neat |[logistic, regression, models, are, neat]    |(2000,[286,695,1138,1193,1604],[1.0,1.0,1.0,1.0,1.0])                |
+-----+------------------------------------+---------------------------------------------+---------------------------------------------------------------------+

一个单词被哈希成了一个不同的索引值。以”I heard about Spark and I love Spark”为例,输出结果中2000代表哈希表的桶数,“[240,333,1105,1329,1357,1777]”分别代表着“heard, about, i, spark, and, love”的哈希值,“[1.0,1.0,2.0,2.0,1.0,1.0]”为对应单词的出现次数,无序。

2.4、IDF修正词频特征向量

可以看到,分词序列被变换成一个稀疏特征向量,其中每个单词都被散列成了一个不同的索引值,特征向量在某一维度上的值即该词汇在文档中出现的次数。 最后,使用IDF来对单纯的词频特征向量进行修正,使其更能体现不同词汇对文本的区别能力,IDF是一个Estimator,调用fit()方法并将词频向量传入,即产生一个IDFModel。

 

IDF idf = new IDF().setInputCol("rawFeatures").setoutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
2.5、得到单词对应的TF-IDF度量值

很显然,IDFModel是一个Transformer,调用它的transform()方法,即可得到每一个单词对应的TF-IDF度量值。

 

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.show(false);

输出结果:

 

+-----+------------------------------------+---------------------------------------------+---------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|sentence                            |words                                        |rawFeatures                                                          |features                                                                                                                                                                       |
+-----+------------------------------------+---------------------------------------------+---------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |I heard about Spark and I love Spark|[i, heard, about, spark, and, i, love, spark]|(2000,[240,333,1105,1329,1357,1777],[1.0,1.0,2.0,2.0,1.0,1.0])       |(2000,[240,333,1105,1329,1357,1777],[0.6931471805599453,0.6931471805599453,1.3862943611198906,0.5753641449035617,0.6931471805599453,0.6931471805599453])                       |
|0    |I wish Java Could use case classes  |[i, wish, java, Could, use, case, classes]   |(2000,[213,342,489,495,1329,1809,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|(2000,[213,342,489,495,1329,1809,1967],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453])|
|1    |Logistic regression models are neat |[logistic, regression, models, are, neat]    |(2000,[286,695,1138,1193,1604],[1.0,1.0,1.0,1.0,1.0])                |(2000,[286,695,1138,1193,1604],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                               |
+-----+------------------------------------+---------------------------------------------+---------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

 

Dataset<Row> data = rescaledData.select("features","label");
data.show(false);

输出结果:

 

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                        |label|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(2000,[240,333,1105,1329,1357,1777],[0.6931471805599453,0.6931471805599453,1.3862943611198906,0.5753641449035617,0.6931471805599453,0.6931471805599453])                        |0    |
|(2000,[213,342,489,495,1329,1809,1967],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453]) |0    |
|(2000,[286,695,1138,1193,1604],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                                |1    |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+

可以看到,特征向量已经被其在语料库中出现的总次数进行了修正,通过TF-IDF得到的特征向量,在接下来可以被应用到相关的机器学习方法中。

十三、spark机器学习从0到1特征抽取–CountVectorize

 

img

一、概念

CountVectorizer 旨在通过计数来将一个文档转换为向量。当不存在先验字典时,Countvectorizer作为Estimator提取词汇进行训练,并生成一个CountVectorizerModel用于存储相应的词汇向量空间。该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法,例如LDA( Latent Dirichlet Allocation 隐含狄利克雷分布)。

在CountVectorizerModel的训练过程中,CountVectorizer将根据语料库中的词频排序从高到低进行选择,词汇表的最大含量由vocabsize超参数来指定,超参数minDF则指定词汇表中的词语至少要在多少个不同文档中出现。

二、代码实现
2.1、构造文档集合

 

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

 //获取spark
SparkSession spark = SparkSession.builder().appName("CountVectorizerModel").master("local").getorCreate();


//获取数据 DataFrame
List<Row> rawData = Arrays.asList(RowFactory.create(0, new String[] {"a", "b", "c"}),
                        RowFactory.create(1, new String[] {"a", "b", "b", "c", "a"}));
StructType schema = new StructType(new StructField[] {
        new StructField("id",DataTypes.IntegerType,false,Metadata.empty()),
        new StructField("words",new ArrayType(DataTypes.StringType,true),false,Metadata.empty())
});
Dataset<Row> data = spark.createDataFrame(rawData, schema);
data.show(false);

输出结果:

 

+---+---------------+
|id |words          |
+---+---------------+
|0  |[a, b, c]      |
|1  |[a, b, b, c, a]|
+---+---------------+
2.2、设定参数,训练模型

通过CountVectorizer设定超参数,训练一个CountVectorizerModel,这里设定词汇表的最大量为3,设定词汇表中的词至少要在2个文档中出现过,以过滤那些偶然出现的词汇。

 

CountVectorizerModel cvModel = new  CountVectorizer().setInputCol("words")
                                                        .setoutputCol("features")
                                                        .setVocabSize(3)
                                                        .setMinDF(2)
                                                        .fit(data);
String[] vocabulary =  cvModel.vocabulary();

在训练结束后,可以通过CountVectorizerModel的vocabulary成员获得到模型的词汇表。

2.3、获取文档向量

使用这一模型对DataFrame进行变换,可以得到文档的向量化表示:

 

cvModel.transform(data).show(false);

输出结果:

 

+---+---------------+-------------------------+
|id |words          |features                  |
+---+---------------+-------------------------+
|0  |[a, b, c]       |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c,  a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+

和其他Transformer不同,CountVectorizerModel可以通过指定一个先验词汇表来直接生成,如以下例子,直接指定词汇表的成员是“a”,“b”两个个词:

 

CountVectorizerModel cvm = new CountVectorizerModel(new String[] {"a", "b"}).setInputCol("words")
                                                                            .setoutputCol("features");
cvm.transform(data).show(false);

输出结果:

 

+---+---------------+-------------------+
|id |words          |features           |
+---+---------------+-------------------+
|0  |[a, b, c]      |(2,[0,1],[1.0,1.0])|
|1  |[a, b, b, c, a]|(2,[0,1],[2.0,2.0])|
+---+---------------+-------------------+

十四、spark机器学习从0到1特征抽取–Word2Vec

 

img

 

一、概念

Word2vec是一个Estimator,它采用一系列代表文档的词语来训练word2vecmodel。该模型将每个词语映射到一个固定大小的向量。word2vecmodel使用文档中每个词语的平均数来将文档转换为向量,然后这个向量可以作为预测的特征,来计算文档相似度计算等等。

二、代码实现
2.1、引包,获取spark

首先,我们引入相关包:

 

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;块

然后是获取spark

 

SparkSession spark =  SparkSession.builder().appName("Word2VecTest").master("local").getorCreate();
2.2、构建测试数据

接着呢来构建一个DataFrame,往DateFrame里加一些测试的文档信息

 

List<Row> rawData =  Arrays.asList(RowFactory.create(Arrays.asList("Hi I heard about Spark".split(","))),
                        RowFactory.create(Arrays.asList("I wish Java  Could use case classes".split(","))),
                        RowFactory.create(Arrays.asList("Logistic  regression models are neat".split(","))));
StructType schema = new StructType(new  StructField[] {
        new StructField("text",new  ArrayType(DataTypes.StringType,false),false,Metadata.empty())
});

Dataset<Row> documentDF =  spark.createDataFrame(rawData,schema);
documentDF.show(false);

我们来查看一下控制台的输出结果:

 

+-------------------------------------+
|text                                 |
+-------------------------------------+
|[Hi I heard about Spark]             |
|[I wish Java Could use case classes] |
|[Logistic regression models are neat]|
+-------------------------------------+
2.3、新建评估器,训练,转换得到向量

接下来我们新建一个Word2Vec的评估器,把单词和向量建立一个映射,设定输入为文本信息text,输出为追加列result,变量的大小为3,最小计数为0。建立完之后,用Word2Vec评估器对文档进行训练和转换,得到Dataset的数据集。

 

Word2Vec word2Vec = new  Word2Vec().setInputCol("text")
                                   .setoutputCol("result")
                                   .setVectorSize(3)
                                   .setMinCount(0);

Word2VecModel model =  word2Vec.fit(documentDF);
Dataset<Row> result =  model.transform(documentDF);
result.show(false);

我们看一下输出结果:

 

+-----------------------------------------+-------------------------------------------------------------------------------+
|text                                                        |result                                                                                                            |
+-----------------------------------------+-------------------------------------------------------------------------------+
|[Hi I heard about Spark]                       |[-0.12674053013324738,0.09846510738134384,-0.10375533252954483] |
|[I wish Java Could use case classes]      |[-0.1633371263742447,-0.14517612755298615,0.11354436725378036]   |
|[Logistic regression models are  neat] |[-0.019123395904898643,-0.13107778131961823,0.14307855069637299]|
+--------------------------------------- -+-------------------------------------------------------------------------------+

我们可以看到,通过Word2VecModel将文档转换为向量,然后这个向量可以作为预测的特征,来计算文档相似度计算啦。

十五、spark机器学习从0到1特征选择-卡方选择器

 

img

一、公式

卡方检验的基本公式,也就是χ2的计算公式,即观察值和理论值之间的偏差

 

img

卡方检验公式

其中:A 为观察值,E为理论值,k为观察值的个数,最后一个式子实际上就是具体计算的方法了 n 为总的频数,p为理论频率,那么n*p自然就是理论频数(理论值)

二、相关概念

卡方分布:可以看出当观察值和理论值十分接近的时候,也就是我们做的假设是正确的时候,χ2的值就越趋近于0,也就是说我们计算的偏差越小,那么假设值就越可能是对的,反之偏差值越大,假设值就越不准确。那么到底多大才算不准确,有没有个衡量的数值标准呢?答案是有:卡方分布。

卡方检验是以χ2分布为基础的一种常用假设检验方法。若k 个随机变量Z1、……、Zk 相互独立,且数学期望为0、方差为 1(即服从标准正态分布),则随机变量X被称为服从自由度为 k 的卡方分布,记作

 

img

卡方分布

,卡方分布的公式为:

 

img

卡方分布

自由度:自由度指的是计算某一统计量时,取值不受限制的变量个数。通常df=n-k。其中n为样本数量,k为被限制的条件数或变量个数。自由度v=(行数-1)(列数-1)。

自由度与卡方分布的关系

如图

 

img

自由度与卡方分布的关系图

图中的Freedom 这里有5条线,分别对应Freedom=1, 4, 10, 20 , 100 。这个Freedom 就是自由度,即个式子中独立变量的个数。 x 横坐标是卡方检验公式计算出来的偏差χ2,而 y 纵坐标表示假设的正确的概率。当自由度为1时,卡方分布式一个倾斜的曲线,当自由度逐渐增大是,卡方分布逐步变的平缓。在一定范围内,随着自由度越来越大,卡方分布会越来越接近正态分布。

三、利用卡方检验用来特征选择

特征选择(Feature Selection):指的是在特征向量中选择出那些“优秀”的特征,组成新的、更“精简”的特征向量的过程。它在高维数据分析中十分常用,可以剔除掉“冗余”和“无关”的特征,提升学习器的性能

特征选择方法分类方法一样,也主要分为有监督(Supervised)和无监督(Unsupervised)两种,卡方选择则是统计学上常用的一种有监督特征选择方法,它通过对特征和真实标签之间进行卡方检验,来判断该特征和真实标签的关联程度,进而确定是否对其进行选择。

对于建立模型而言并非特征越多越好,因为建模的目标是使用尽量简单的模型去实现尽量好的效果。减少一些价值小贡献小的特征有利于在表现效果不变或降低度很小的前提下,新找到最简单的模型。

那么什么样的特征是价值小的呢?想想我们之所以用机器学习的模型去学习特征,是为了更好地预测被特征影响着的应变量(标签)。那么那些根本不会对应变量产生影响,或者影响很小的特征理应事先去掉。

那么怎么判断特征对应变量的影响程度的大小呢?我们可以使用卡方检验对特征与应变量进行独立性检验,如果独立性高,那么表示两者没太大关系,特征可以舍弃;如果独立性小,两者相关性高,则说明该特征会对应变量产生比较大的影响,应当选择。

卡方检验在实际应用到特征选择中的时候,不需要知道自由度,也不用知道卡方分布,只需要根据算出来的χ2 进行排序即可,值越大越好。挑选最大的一堆,就完成了利用卡方检验来进行特征提取

四、代码实现

和ML库中的大多数学习方法一样,ML中的卡方选择也是以estimator+transformer的形式出现的,其主要由ChiSqSelector和ChiSqSelectorModel两个类来实现。

1、首先引入相关需要用的包

 

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.feature.ChiSqSelectorModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
2、接下来获取spark

 

SparkSession spark =  SparkSession.builder().appName("ChiSqSelectorTest").master("local").getorCreate();
3、然后,我们构造一个数据集DataFrame:

 

List<Row> rawData = Arrays.asList(RowFactory.create(1, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1),
                                  RowFactory.create(2, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0),
                                  RowFactory.create(3, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0));
StructType schema = new StructType(new StructField[] {
        new StructField("id",DataTypes.IntegerType,false,Metadata.empty()),    
        new StructField("features",new VectorUDT(),false,Metadata.empty()),
        new StructField("label",DataTypes.IntegerType,false,Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(rawData,schema);
df.show(false);

打印结果:

 

+---+------------------+-----+
|id |features          |label|
+---+------------------+-----+
|1  |[0.0,0.0,18.0,1.0]|1    |
|2  |[0.0,1.0,12.0,0.0]|0    |
|3  |[1.0,0.0,15.0,0.1]|0    |
+---+------------------+-----+
4、接着我们开始用卡方选择进行特征选择器的训练

 

ChiSqSelector select = new ChiSqSelector().setNumTopFeatures(1)
                                          .setFeaturesCol("features")
                                          .setLabelCol("label")
                                          .setoutputCol("selected-feature");
ChiSqSelectorModel selectModel = select.fit(df);
Dataset<Row> result = selectModel.transform(df);
result.show(false);

numTopFeatures:用来设置固定的提取特征的数量,程序会根据卡方值的高低返回前n个卡方值最高的特征。(预测能力最强的前n个特征),认选择前50个特征。这里,我们设置ChiSqSelector(卡方选择器)的numTopFeatures = 1,即在4个特征中选择处最好的1个特征。

打印结果:

 

+---+------------------+-----+----------------+
|id |features          |label|selected-feature|
+---+------------------+-----+----------------+
|1  |[0.0,0.0,18.0,1.0]|1    |[18.0]          |
|2  |[0.0,1.0,12.0,0.0]|0    |[12.0]          |
|3  |[1.0,0.0,15.0,0.1]|0    |[15.0]          |
+---+------------------+-----+----------------+

用训练出的模型对原数据集进行处理,可以看见,第三列特征被选出作为最有用的特征列。

十六、spark机器学习从0到1特征变换-标签和索引的转化

 

img

一、原理

在机器学习处理过程中,为了方便相关算法的实现,经常需要把标签数据(一般是字符串)转化成整数索引,或是在计算结束后将整数索引还原为相应的标签. Spark ML 包中提供了几个相关的转换器: StringIndexer,IndexToString,OneHotEncoder,VectorIndexer,他们提供了十分方便的特征转换功能,这些转换器都位于org.apache.spark.ml.feature包下。

值得注意的是,用于特征转换的转换器和其他的机器学习算法一样,也属于Ml Pipeline模型的一部分,可以用来构成机器学习流水线,以StringIndexer为例。

StringIndexer(字符串-索引变换)将字符串的标签编码成标签索引。标签索引序列的取值范围是[0,numLabels(字符串中所有出现的单词去掉重复的词后的总和)],按照标签出现频率排序,出现最多的标签索引为0。如果输入是数值型,我们先将数值映射到字符串,再对字符串进行索引化。如果下游的pipeline(例如:Estimator或者Transformer)需要用到索引化后的标签序列,则需要将这个pipeline的输入列名字指定为索引化序列的名字。大部分情况下,通过setInputCol设置输入的列名。

下面来具体介绍StringIndexer、IndexToString、OneHotEncoder、VectorIndexer。

二、StringIndexer(字符串-索引变换)
2.1、原理

StringIndexer将标签的字符串列编码为标签索引的列。 索引位于[0,numLabels)中,并支持四个排序选项:“frequencyDesc”:按标签频率的降序(最频繁的标签分配为0),“frequencyAsc”:按标签频率的升序(最不频繁的标签分配为0) ,“alphabetDesc”:降序字母顺序和“alphabetAsc”:升序字母顺序(认=“frequencyDesc”)。 如果用户选择保留,则看不见的标签将放置在索引numLabels处。 如果输入列为数字,则将其强制转换为字符串并为字符串值编制索引。 当下游管道组件(例如Estimator或Transformer)使用此字符串索引标签时,必须将组件的输入列设置为此字符串索引名称。 在许多情况下,可以使用setInputCol设置输入列。

2.2、代码实现

首先引入需要用的包:

 

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.OneHotEncoderEstimator;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.collection.immutable.Set;

获取spark:

 

SparkSession spark = SparkSession.builder().appName("StringIndexerTest").master("local").getorCreate();

构造一些简单数据:

 

List<Row> rowRDD = Arrays.asList(RowFactory.create(0,"a"),
                        RowFactory.create(1,"b"),
                        RowFactory.create(2,"c"),
                        RowFactory.create(3,"a"),
                        RowFactory.create(4,"a"),
                        RowFactory.create(5,"c"));

StructType schema = new StructType(new StructField[] {
        new StructField("id",DataTypes.IntegerType,false,Metadata.empty()),
        new StructField("category",DataTypes.StringType,false,Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
df.show(false);

输出结果:

 

+---+--------+
|id |category|
+---+--------+
|0  |a       |
|1  |b       |
|2  |c       |
|3  |a       |
|4  |a       |
|5  |c       |
+---+--------+

然后构建StringIndexer模型,我们创建一个StringIndexer对象,设定输入输出列名,其余参数采用认值,并对这个DataFrame进行训练,产生StringIndexerModel对象:

 

StringIndexer indexer = new StringIndexer().setInputCol("category").setoutputCol("categoryIndex");
StringIndexerModel model = indexer.fit(df);

之后我们即可利用StringIndexerModel对象对DataFrame数据进行转换操作,可以看到,认情况下,StringIndexerModel依次按照出现频率的高低,把字符标签进行了排序,即出现最多的“a”被编号成0,“c”为1,出现最少的“b”为0。

 

Dataset<Row> indexed1 = model.transform(df);
indexed1.show(false);

输出结果:

 

+---+--------+-------------+
|id |category|categoryIndex|
+---+--------+-------------+
|0  |a       |0.0          |
|1  |b       |2.0          |
|2  |c       |1.0          |
|3  |a       |0.0          |
|4  |a       |0.0          |
|5  |c       |1.0          |
+---+--------+-------------+

如果我们使用已有的数据构建了一个StringIndexerModel,然后再构建一个新的DataFrame,这个DataFrame中有着模型内未曾出现的标签“d”,用已有的模型去转换这一DataFrame会有什么效果? 实际上,如果直接转换的话,Spark会抛出异常,报出“Unseen label: d”的错误。 为了处理这种情况,在模型训练后,可以通过设置setHandleInvalid("skip")来忽略掉那些未出现的标签,这样,带有未出现标签的行将直接被过滤掉,所下所示:

 

List<Row> rowRDD2 = Arrays.asList(RowFactory.create(0,"a"),
        RowFactory.create(1,"b"),
        RowFactory.create(2,"c"),
        RowFactory.create(3,"a"),
        RowFactory.create(4,"a"),
        RowFactory.create(5,"d"));
Dataset<Row> df2 = spark.createDataFrame(rowRDD2, schema);
Dataset<Row> indexed2 = model.transform(df2);
indexed2.show(false);

输出结果:

 

Unseen label: d. 

 

Dataset<Row> indexed2 = model.setHandleInvalid("skip").transform(df2);
indexed2.show(false);

输出结果:

 

+---+--------+-------------+
|id |category|categoryIndex|
+---+--------+-------------+
|0  |a       |0.0          |
|1  |b       |2.0          |
|2  |c       |1.0          |
|3  |a       |0.0          |
|4  |a       |0.0          |
+---+--------+-------------+
三、IndexToString(索引-字符串变换)
3.1、原理

与StringIndexer对应,IndexToString将索引化标签还原成原始字符串。一个常用的场景是先通过StringIndexer产生索引化标签,然后使用索引化标签进行训练,最后再对预测结果使用IndexToString来获取其原始的标签字符串。

3.2、代码实现

首先我们用StringIndexer读取数据集中的“category”列,把字符型标签转化成标签索引,然后输出到“categoryIndex”列上,构建出一个新的DataFrame数据集

 

List<Row> rawData =  Arrays.asList(RowFactory.create(0, "a"),
                        RowFactory.create(1, "b"),
                        RowFactory.create(2, "c"),
                        RowFactory.create(3, "a"),
                        RowFactory.create(4, "a"),
                        RowFactory.create(5, "c"));

Dataset<Row> df3 = spark.createDataFrame(rawData, schema);
Dataset<Row> indexed3 = indexer.fit(df3).transform(df3);

然后我们创建IndexToString对象,读取“categoryIndex”上的标签索引,获得原有数据集的字符型标签,然后再输出到“originalCategory”列上。最后,通过输出“originalCategory”列,就可以看到数据集中原有的字符标签了。

 

IndexToString converter = new IndexToString().setInputCol("categoryIndex").setoutputCol("originalCategory");
Dataset<Row> converted3 = converter.transform(indexed3);
converted3.show(false);

输出结果:

 

+---+--------+-------------+----------------+
|id |category|categoryIndex|originalCategory|
+---+--------+-------------+----------------+
|0  |a       |0.0          |a               |
|1  |b       |2.0          |b               |
|2  |c       |1.0          |c               |
|3  |a       |0.0          |a               |
|4  |a       |0.0          |a               |
|5  |c       |1.0          |c               |
+---+--------+-------------+----------------+
四、OneHotEncoder(独热编码)
4.1、原理

独热编码(One-hot encoding)将类别特征映射为二进制向量,其中只有一个有效值(为1,其余为0)。这样在诸如Logistic回归这样需要连续数值值作为特征输入的分类器中也可以使用类别(离散)特征。

One-Hot编码适合一些期望类别特征为连续特征的算法,比如说逻辑斯蒂回归等。

4.2、代码实现

首先创建一个DataFrame,其包含一列类别性特征,需要注意的是,在使用OneHotEncoder进行转换前,DataFrame需要先使用StringIndexer将原始标签数值化

 

List<Row> rawData4 = Arrays.asList(RowFactory.create(0.0, 1.0),
          RowFactory.create(1.0, 0.0),
          RowFactory.create(2.0, 1.0),
          RowFactory.create(0.0, 2.0),
          RowFactory.create(0.0, 1.0),
          RowFactory.create(2.0, 0.0));

StructType schema4 = new StructType(new StructField[] {
        new StructField("id",DataTypes.DoubleType,false,Metadata.empty()),
        new StructField("category",DataTypes.DoubleType,false,Metadata.empty())
});

Dataset<Row> df4 = spark.createDataFrame(rawData4, schema4);

我们创建OneHotEncoder对象对处理后的DataFrame进行编码,可以看见,编码后的二进制特征呈稀疏向量形式,与StringIndexer编码的顺序相同,需注意的是最后一个Category(”b”)被编码为全0向量,若希望”b”也占有一个二进制特征,则可在创建OneHotEncoder时指定setDropLast(false)。

 

OneHotEncoderEstimator encoder = new OneHotEncoderEstimator()
                               .setInputCols(new String[] {"id","category"})
                               .setoutputCols(new String[] {"categoryVec1","categoryVec2"});
Dataset<Row> encoded4 = encoder.fit(df4).transform(df4);
encoded4.show(false);

输出结果:

 

+---+--------+-------------+-------------+
|id |category|categoryVec1 |categoryVec2 |
+---+--------+-------------+-------------+
|0.0|1.0     |(2,[0],[1.0])|(2,[1],[1.0])|
|1.0|0.0     |(2,[1],[1.0])|(2,[0],[1.0])|
|2.0|1.0     |(2,[],[])    |(2,[1],[1.0])|
|0.0|2.0     |(2,[0],[1.0])|(2,[],[])    |
|0.0|1.0     |(2,[0],[1.0])|(2,[1],[1.0])|
|2.0|0.0     |(2,[],[])    |(2,[0],[1.0])|
+---+--------+-------------+-------------+
五、VectorIndexer(向量类型索引化)
5.1、原理

VectorIndexer帮助索引Vector数据集中的分类特征。 它既可以自动确定哪些特征是分类的,又可以将原始值转换为分类索引。 具体来说,它执行以下操作:

1、设置类型为Vector的输入列和参数maxCategories。 2、根据不同值的数量确定应分类的要素,其中最多具有maxCategories的要素被声明为分类。 3、为每个分类特征计算从0开始的分类索引。 4、为分类特征建立索引,并将原始特征值转换为索引。

索引分类特征允许诸如决策树和树组合之类的算法适当地处理分类特征,从而提高性能

5.2、代码实现

首先,我们读入一个数据集DataFrame,然后使用VectorIndexer训练出模型,来决定哪些特征需要被作为类别特征,将类别特征转换为索引,这里设置maxCategories为2,即只有种类小于2的特征才被认为是类别型特征,否则被认为是连续型特征:

 

List<Row> rawData5 = Arrays.asList(RowFactory.create(Vectors.dense(-1.0, 1.0, 1.0)),
                RowFactory.create(Vectors.dense(-1.0, 3.0, 1.0)),
                RowFactory.create(Vectors.dense(0.0, 5.0, 1.0)));

StructType schema5 = new StructType(new StructField[] {
        new StructField("features",new VectorUDT(),false,Metadata.empty())
});

Dataset<Row> df5 = spark.createDataFrame(rawData5, schema5);
df5.show(false);
VectorIndexerModel indexModel = new VectorIndexer()
                                .setInputCol("features")
                                .setoutputCol("indexed")
                                .setMaxCategories(2).fit(df5);
Set<Object> categoricalFeatures = indexModel.categoryMaps().keySet();
System.out.println(categoricalFeatures.mkString(","));

输出结果:

 

0,2

 

Dataset<Row> indexed5 = indexModel.transform(df5);
indexed5.show(false);

输出结果:

 

+--------------+-------------+
|features      |indexed      |
+--------------+-------------+
|[-1.0,1.0,1.0]|[1.0,1.0,0.0]|
|[-1.0,3.0,1.0]|[1.0,3.0,0.0]|
|[0.0,5.0,1.0] |[0.0,5.0,0.0]|
+--------------+-------------+

十七、案例(一) 利用机器算法RFM模型做用户价值分析

img

一、案例背景

在产品迭代过程中,通常需要根据用户属性进行归类,也就是通过分析数据,对用户进行归类,以便于在推送及转化过程中获得更大的收益。

本案例是基于某互联网公司的实际用户购票数据为研究对象,对用户购票的时间,购买的金额进行了采集,每个用户用手机号来区别唯一性。数据分析人员根据用户购买的时间和金额,通过建立RFM模型,来计算出用户最近最近一次购买的打分,用户购买频率的打分,用户购买金额的打分,然后根据三个分数进行一个加权打分,和综合打分。业务人员可以根据用户的打分情况,对不同的用户进行个性化营销和精准营销,例如给不同的用户推送定制的营销短信,不同优惠额度的打折券等等。

通过RFM方法,可以根据用户属性数据分析,对用户进行了归类。在推送、转化等很多过程中,可以更加精准化,不至于出现用户反感的情景,更重要的是,对产品转化等商业价值也有很大的帮助。

二、RFM概念

RFM模型是衡量客户价值和客户创利能力的重要工具和手段。在众多的客户关系管理(CRM)的分析模式中,RFM模型是被广泛提到的。该机械模型通过一个客户的近期购买行为、购买的总体频率以及花了多少钱3项指标来描述该客户的价值状况。

RFM分析 就是根据客户活跃程度和交易金额的贡献,进行客户价值细分的一种方法。其中:

R(Recency):客户最近一次交易时间的间隔。R值越大,表示客户交易发生的日期越久,反之则表示客户交易发生的日期越近。

F(Frequency):客户在最近一段时间内交易的次数。F值越大,表示客户交易越频繁,反之则表示客户交易不够活跃。

M(Monetary):客户在最近一段时间内交易的金额。M值越大,表示客户价值越高,反之则表示客户价值越低。

 

img

客户价值

R打分:基于最近一次交易日期计算的得分,距离当前日期越近,得分越高。例如5分制。

F打分:基于交易频率计算的得分,交易频率越高,得分越高。如5分制。

M打分:基于交易金额计算的得分,交易金额越高,得分越高。如5分制。

RFM总分值:RFM=Rx100+Fx10+Mx1

RFM分析的主要作用:

  • 识别优质客户。可以指定个性化的沟通和营销服务,为更多的营销决策提供有力支持

  • 能够衡量客户价值和客户利润创收能力。

三、代码实现
3.1、引包

首先我们引入需要用的包,数据分析常用的numpy包,pandas包,等。

 

import time
import numpy as np
import pandas as pd
import MysqL.connector
3.2、读取数据

接下来我们开始用pd.read_csv方法读取用户的数据

 

print(time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))+':读取数据...')

config = {
    'host' : '127.0.0.1',
    'user' : 'root',
    'password' : 'test123',
    'port' : 3306,
    'database' : 'user',
    'charset' : 'gb2312'
}
cnn = MysqL.connector.connect(**config) # 建立MysqL连接
cursor = cnn.cursor() # 获得游标
sql = "SELECT  phoneNo AS PHONENO,create_date AS ORDERDATE,order_no AS ORDERNO,ROUND(pay_amount/100,2) AS PAYAMOUNT " \
      "FROM user.`event_record_order`" # sql语句
raw_data = pd.read_sql(sql,cnn,index_col='PHONENO')
cursor.close() # 关闭游标
cnn.close() # 关闭连接
print(time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))+':读取数据完毕!')
print(time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))+':开始建立RFM模型...')

介绍一下config 里的参数信息:host是数据库的ip信息,本案例用的是本地数据库,实际部署生产服务器时,改成生产的ip地址即可。user 是数据库用户名,password是密码,port是数据库的端口号,database是连接的数据库名 (schema),charset是字符集编码。

购票时间(ORDERDATE),订单号(ORDERID)是object类型,订单金额(AMOUNTINFO)是浮点类型。index_col指定了数据中用户的唯一性用 USERID来表示。

time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time())打印了当前的系统时间,用来记录日志信息。

3.3、数据审查

 

print('Data Overview :')
print(raw_data.head(4)) #打印原始数据前4条
print('-' * 30)
print('Data DESC:')
print(raw_data.describe())  #打印原始数据基本描述性信息

我们用raw_data.head(n)来指定取出数据的前几条,'-'*30是用来输出打印分隔线,下文再出现时不再重复解释,用raw_data.describe()来获得数据的基本描述性信息。输出结果:

 

Data Overview:
                      ORDERDATE               ORDERNO  PAYAMOUNT
PHONENO                                                         
135****0930 2019-10-02 13:37:36  01201910021336227979        7.0
183****1153 2019-09-30 06:22:29  0120190930062149F9AF        4.5
150****6073 2019-10-30 18:21:45  01201910301821065CFD        2.0
173****7295 2019-10-21 15:13:23  01201910211512498153        7.0
------------------------------
Data DESC:
          PAYAMOUNT
count  96323.000000
mean       4.212409
std        3.049499
min        0.000000
25%        2.600000
50%        3.600000
75%        5.000000
max       80.000000

我们看到结果中的 count表示总共的记录条数,mean表示了均值,std表示标准差,min表示最小值,25%表示下四分位,也叫第一四分位,50%表示中位值,也叫第二四分位,75%表示上四分位,也叫第三四分位。

 

na_cols = raw_data.isnull().any(axis=0) #查看每一列是否具有缺失值
print('NA Cols:')
print(na_cols)
print('-' * 30)
na_lines = raw_data.isnull().any(axis=1) #查看每一行是否具有缺失值
print('NA Records:')
print('Total number of NA lines is :{0}'.format(na_lines.sum()))  #查看具有缺失值的行总记录数
print(raw_data[na_lines])  #只查看具有缺失值的行信息

我们用raw_data.isnull()来判断是否有缺失值,其中参数axis=0表示的是列,axis=1表示的是行,用:{0}'.format()的方式在字符串中传入参数。输出结果:

 

NA Cols:
ORDERDATE    False
ORDERNO      False
PAYAMOUNT    False
dtype: bool
------------------------------
NA Records:
Total number of NA lines is :0
Empty DataFrame
Columns: [ORDERDATE, ORDERNO, PAYAMOUNT]
Index: []

通过结果可以看到,实际的交易用户数据还是比较完整的,没有缺失数据的情况,可能这批数据被技术人员采集过来已经处理过了,不讨论了。如果数据有缺失的情况怎么办?那就要对缺失的数据进行一个预处理。

3.4、数据预处理

数据预处理,包括数据异常,格式转换,单位转化(如果有单位不统一的情况)等。

我们先来看异常值处理:

 

sales_data = raw_data.dropna() #丢弃带有缺失值的行记录
sales_data = sales_data[sales_data['PAYAMOUNT'] > 1]

这里,我用代码去除了小于1元的订单,正常出行连1块钱都不用,那应该是测试数据了,现在谁出门做个公交还不得1元起步。对于用户有缺失值的记录进行了丢弃,当然也可以用其他的方法,例如平均值补全法。

然后看日期格式转换:

 

sales_data['ORDERDATE'] = pd.to_datetime(sales_data['ORDERDATE'])
print('Raw Dtype:')
print(sales_data.dtypes)

用pd.to_datetime()方法用户的订单日期进行了格式化转换。输出结果:

 

Raw Dtype:
ORDERDATE    datetime64[ns]
ORDERNO              object
PAYAMOUNT           float64
dtype: object

最后看数据转换:

 

recency_value = sales_data['ORDERDATE'].groupby(sales_data.index).max()  #计算原始最近一次购买时间
frequency_value = sales_data['ORDERDATE'].groupby(sales_data.index).count()    #计算原始订单数
monetray_value = sales_data['PAYAMOUNT'].groupby(sales_data.index).sum()  #计算原始订单总金额

这里根据订单日期的聚合运算得到了用户的最近一次购买时间,用户总的购买数,和购买金额,max()得到了购买时间,count()得到了购买数量,sum()得到了购买金额。

3.5、计算RFM得分

得到了最近的购买时间,购买数,和购买金额,下面就可以开始计算RFM得分了。

 

deadline_date = pd.datetime(2019,11,15)
r_interval = (deadline_date - recency_value).dt.days
r_score = pd.cut(r_interval,5,labels=[5,4,3,2,1])
f_score = pd.cut(frequency_value,5,labels=[1,2,3,4,5])
m_score = pd.cut(monetray_value,5,labels=[1,2,3,4,5])

我们又把客户分成五等分,这个五等分分析相当于是一个“忠诚度的阶梯”(loyalty ladder),如购买一次的客户为新客户,购买两次的客户为潜力客户,购买三次的客户为老客户,购买四次的客户为成熟客户,购买五次及以上则为忠实客户。其诀窍在于让消费者一直顺着阶梯往上爬,把销售想象成是要将两次购买的顾客往上推成三次购买的顾客,把一次购买者变成两次的。

我们用deadline_date来表示分析的截止日期,那么统计用户的时间范围就是从数据中最早开始的购买时间到deadline_date。

用pandas.series.dt.days可以对操作后的datatime直接进行取数。pandas.cut用来把一组数据分割成离散的区间。

简单介绍一下pandas.cut的用法

 

pandas.cut(x, bins, right=True, labels=None, retbins=False, precision=3, include_lowest=False, duplicates='raise')
  • x:被切分的类数组(array-like)数据,必须是1维的(不能用DataFrame);

  • bins

    :bins是被切割后的区间(或者叫“桶”、“箱”、“面元”),有3中形式:一个int型的标量、标量序列(数组)或者pandas.IntervalIndex 。

    • 一个int型的标量,当bins为一个int型的标量时,代表将x平分成bins份。x的范围在每侧扩展0.1%,以包括x的最大值和最小值。

    • 标量序列,标量序列定义了被分割后每一个bin的区间边缘,此时x没有扩展。

    • pandas.IntervalIndex,定义要使用的精确区间。

  • right:bool型参数,认为True,表示是否包含区间右部。比如如果bins=[1,2,3],right=True,则区间为(1,2],(2,3];right=False,则区间为(1,2),(2,3)。

  • labels:给分割后的bins打标签,比如把年龄x分割成年龄段bins后,可以给年龄段打上诸如青年、中年的标签。labels的长度必须和划分后的区间长度相等,比如bins=[1,2,3],划分后有2个区间(1,2],(2,3],则labels的长度必须为2。如果指定labels=False,则返回x中的数据在第几个bin中(从0开始)。

  • retbins:bool型的参数,表示是否将分割后的bins返回,当bins为一个int型的标量时比较有用,这样可以得到划分后的区间,认为False。

  • precision:保留区间小数点的位数,认为3.

  • include_lowest:bool型的参数,表示区间的左边是开还是闭的,认为false,也就是不包含区间左部(闭)。

  • duplicates:是否允许重复区间。有两种选择:raise:不允许,drop:允许。

重点理解我标粗的几个参数,其他参数有需要用到时查阅。

RFM数据合并

 

rfm_list = [r_score,f_score,m_score]  #将r、f、m三个维度组成列表
rfm_cols = ['r_score','f_score','m_score'] #设置r、f、m 三个维度列名
rfm_pd = pd.DataFrame(np.array(rfm_list).transpose(),dtype=np.int32,columns=rfm_cols,index=frequency_value.index) #建立r、f、m数据框

我们把RFM的数据进行了合并,首先是将r、f、m三个维度组成一个列表,然后取了三个列名,把数据,列名组装成一个数据框DataFrame.

 

print('RFM score Overview:')
print(rfm_pd.head(4))

输出结果:

 

RFM score Overview:
             r_score  f_score  m_score
PHONENO                               
13001055088        4        1        1
13001061903        4        1        1
13001066446        5        1        1
13001123218        4        1        1

 

rfm_pd['rfm_wscore'] = rfm_pd['r_score'] * 0.6 + rfm_pd['f_score'] * 0.3 + rfm_pd['m_score'] * 0.1
rfm_pd_tmp = rfm_pd.copy()
rfm_pd_tmp['r_score'] = rfm_pd_tmp['r_score'].astype('str')
rfm_pd_tmp['f_score'] = rfm_pd_tmp['f_score'].astype('str')
rfm_pd_tmp['m_score'] = rfm_pd_tmp['m_score'].astype('str')
rfm_pd['rfm_comb'] = rfm_pd_tmp['r_score'].str.cat(rfm_pd_tmp['f_score']).str.cat(rfm_pd_tmp['m_score'])

理论上,上一次消费时间越近的顾客应该是比较好的顾客,对提供即时的商品或是服务也最有可能会有反应。营销人员若想业绩有所成长,只能靠偷取竞争对手的市场占有率,而如果要密切地注意消费者的购买行为,那么最近的一次消费就是营销人员第一个要利用的工具。历史显示,如果我们能让消费者购买,他们就会持续购买。这也就是为什么,0至3个月的顾客收到营销人员的沟通信息多于3至6个月的顾客。

这里,对RFM进行了加权打分,R占60%,F占30%,M占10%,当然也可以根据业务的实际情况进行相应的权重调整。综合打分是根据RFM=R100+F10+M*1。

3.6、保存结果

 

print('Final RFM score Overview:')
print(rfm_pd.head(4))
print('-'*30)
print('Final RFM score DESC:')
print(rfm_pd.describe())

rfm_pd.to_csv('sales_rfm_score.csv')

输出结果:

 

Final RFM score Overview:
             r_score  f_score  m_score  rfm_wscore rfm_comb
PHONENO                                                    
13001055088        4        1        1         2.8      411
13001061903        4        1        1         2.8      411
13001066446        5        1        1         3.4      511
13001123218        4        1        1         2.8      411
------------------------------
Final RFM score DESC:
            r_score       f_score       m_score    rfm_wscore
count  53064.000000  53064.000000  53064.000000  53064.000000
mean       3.732172      1.006407      1.002148      2.641441
std        0.944452      0.113022      0.055212      0.570417
min        1.000000      1.000000      1.000000      1.000000
25%        3.000000      1.000000      1.000000      2.200000
50%        4.000000      1.000000      1.000000      2.800000
75%        5.000000      1.000000      1.000000      3.400000
3.7、写入数据库

建立数据库连接

 

table_name = 'sale_rfm_score'
#数据框基本信息
config = {
    'host' : '172.0.0.1',
    'user' : 'root',
    'password' : 'test123',
    'port' : 3306,
    'database' : 'skpda',
    'charset' : 'gb2312'
}
con = MysqL.connector.connect(**config)
cursor = con.cursor()

cursor.execute("show tables")  #
table_object = cursor.fetchall()  # 通过fetchall方法获得所有数据
table_list = []  # 创建库列表
for t in table_object:  # 循环读出所有库
    table_list.append(t[0])  # 每个每个库追加到列表
if not table_name in table_list:  # 如果目标表没有创建
    cursor.execute('''
    CREATE TABLE %s (
    phone_no               VARCHAR(20),
    r_score               int(2),
    f_score              int(2),
    m_score              int(2),
    rfm_wscore              DECIMAL(10,2),
    rfm_comb              VARCHAR(10),
    create_date              VARCHAR(20)
    )ENGINE=InnoDB DEFAULT CHARSET=gb2312
    ''' % table_name)  # 创建新表
print(time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))+ ':开始清除 table {0}的历史数据...'.format(table_name)) # 输出开始清历史数据的提示信息
delete_sql = 'truncate table {0}'.format(table_name)
cursor.execute(delete_sql)
print(time.strftime('%Y-%m-%d %H:%M:%s',time.localtime(time.time()))+ ':清除 table {0}的历史数据完毕!'.format(table_name)) # 输出清除历史数据完毕的提示信息

连接的参数不再介绍,上文已经介绍过。通过fetchall方法获得所有数据,读出所有的表,如果没有表则创建。用cursor.execute先执行truncate语句,把表中的信息先清除,然后重新写入数据。

将数据写入数据库

 

phone_no = rfm_pd.index # 索引列
rfm_wscore = rfm_pd['rfm_wscore']  #RFM 加权得分列
rfm_comb = rfm_pd['rfm_comb']  #RFM组合得分列
timestamp = time.strftime('%Y-%m-%d',time.localtime(time.time())) # 写库日期
print('开始写入数据库表 {0}'.format(table_name)) # 输出开始写库的提示信息
for i in range(rfm_pd.shape[0]):
    insert_sql = "INSERT INTO `%s` VALUES ('%s',%s,%s,%s,%s,%s,'%s')" % \
                 (table_name, phone_no[i], r_score.iloc[i], f_score.iloc[i], m_score.iloc[i], rfm_wscore.iloc[i],
                  rfm_comb.iloc[i], timestamp)  # 写库sql依据
    cursor.execute(insert_sql)
    con.commit()
cursor.close()
con.close()
print('写入数据库结束,总记录条数为: %d' %(i+1))

先从数据集合 rfm_pd (rfm_pd 是一个DataFrame)中获取到rfm的每个字段, ’....{0}'.format(table_name)表示的是在字符串中拼接参数,{0}代表一个字符串占位符。

四、案例结果分析

根据RFM模型的建立,我们在数据库生成了数据。

 

img

数据库生成

然后前段工程师根据数据库里的数据得到了用户RFM的价值打分页面,如图(后台展示页面)。

运营人员根据页面的打分情况来衡量客户价值和客户创利能力,了解客户差异。将客户分别按照R、F、M参数分组后,假设某个客户同时属于R5、F4、M3三个组,则可以得到该客户的RFM代码543。同理,我们可以推测,有一些客户刚刚成功交易、且交易频率高、总采购金额大,其RFM代码是555,还有一些客户的RFM代码是554、545……每一个RFM代码都对应着一小组客户,开展市场营销活动的时候可以从中挑选出若干组进行。

 

img

后台展示页面

用户是根据RFM的打分倒序排列,可以直接找到重点客户的信息,点开手机号,查看客户的详细信息(这一步由前端开发人员实现),针对重点客户展开各种个性化营销。

 

img

重点客户详细信息

RFM三个指标每个维度再细分出5份,这样就能够细分出5x5x5=125类用户,再根据每类用户精准营销……显然125类用户已超出普通人脑的计算范畴了,更别说针对125类用户量体定制营销策略。实际运用上,我们只需要把每个维度做一次两分即可,这样在3个维度上我们依然得到了8组用户

这样,就可以得到以下解读(编号次序RFM,1代表高,0代表低) 重要价值客户(111):最近消费时间近、消费频次和消费金额都很高,必须是VIP啊! 重要保持客户(011):最近消费时间较远,但消费频次和金额都很高,说明这是个一段时间没来的忠诚客户,我们需要主动和他保持联系。 重要发展客户(101):最近消费时间较近、消费金额高,但频次不高,忠诚度不高,很有潜力的用户,必须重点发展。 重要挽留客户(001):最近消费时间较远、消费频次不高,但消费金额高的用户,可能是将要流失或者已经要流失的用户,应当基于挽留措施。

案例结论:

  • 表现处于一般水平以上的用户的比例太小,低于1%(R、F、M三个维度得分均在3以上的用户数),VIP客户太少。

  • 会员中99%以上的客户消费状态都不容乐观,主要体现在消费频率低R、消费总金额低M。这可能跟公司的地铁出行的业务有关系,公司的业务分布在全国中小城市,大部分用户都是使用一次的用户

  • 低价值客户有262个,占总比例的 0.4%,运营人员可以导出下载这批用户

第五章、机器学习算法面试

机器学习面试问题2019

区分机器学习和深度学习

机器学习是人工智能的一个子集,它为机器提供了自动学习和改进的能力,无需任何明确的编程。而深度学习,机器学习的子集,能够做出直觉决策的人工神经网络。

你对Recall和Precision这个术语有什么了解?

召回被称为真正的正面率。它指的是您的模型声明的阳性数量与整个数据中可用阳性数量的比较。精度,或者称为正预测值,基于预测。它是模型声称的准确阳性数量的测量值,与模型实际声明的阳性数量相比较。

监督机器学习和无监督机器学习之间的区别?

在监督学习中,机器在标记数据的帮助下进行训练,即用正确答案标记的数据。而在无监督机器学习中,模型通过自己发现信息来学习。与监督学习模型相比,无监督模型更适合于执行困难的处理任务。

什么是K-means和KNN

K-means是一种无监督算法,用于聚类问题的过程,KNN或K最近邻是一种监督算法,用于回归和分类过程。

什么使分类与回归不同

这两个概念都是监督机器学习技术的一个重要方面。通过分类输出分类为用于进行预测的不同类别。而回归模型通常用于找出预测和变量之间的关系。分类和回归之间的关键区别在于,在前者中,输出变量是离散的,而在后者中是连续的。

您将如何处理数据集中的缺失数据?

数据科学家面临的最大挑战之一是数据丢失问题。您可以通过多种方式对缺失值进行归因,包括分配唯一类别,删除行,使用均值/中值/模式替换,使用支持缺失值的算法,以及预测缺失值等等。

您对归纳逻辑编程(ILP)有何了解?

机器学习的子领域,归纳逻辑编程通过使用逻辑编程来开发预测模型来搜索数据中的模式。该过程假定逻辑程序是假设或背景知识。

您需要采取哪些步骤来确保不会过度使用特定型号?

当模型在训练期间提供大量数据时,它开始从数据集中的噪声和其他错误数据中学习。这使得模型难以学习除了训练集之外概括新实例。有三种方法可以避免机器学习中的过度拟合。第一种方法是保持模型简单,第二种方法是使用交叉验证技术,第三种方法是使用正则化技术,例如LASSO。

什么是合奏学习?

或者,集合方法被称为学习多分类器系统或基于委员会的学习。集合方法是指构建分类器集的学习算法,然后对新数据点进行分类以选择其预测。该方法训练了许多假设以解决相同的问题。集合建模的最佳示例是随机森林树,其中许多决策树用于预测结果。

命名机器学习项目中所需的步骤?

实现良好工作模型应采取的一些关键步骤是收集数据,准备数据,选择机器学习模型,模型训练,评估模型,调整参数,最后是预测。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐