目录
  1. 1. 一、实验平台
    1. 1.1. Centos
      1. 1.1.1. 1、登录
      2. 1.1.2. 2、修改ip地址
      3. 1.1.3. 3、虚拟机和电脑可ping可通
      4. 1.1.4. 4、启动Hadoop平台
    2. 1.2. Windows
      1. 1.2.1. 1、版本
      2. 1.2.2. 2、Hosts
  2. 2. 二、实验数据
    1. 2.1. 1、数据集
    2. 2.2. 2、数据上传
      1. 2.2.1. 1、HDFS创建文件夹(本次实验忽略)
      2. 2.2.2. 2、上传数据至Centos
      3. 2.2.3. 3、上传数据至HDFS(本次实验忽略)
    3. 2.3. 3、解压文件
    4. 2.4. 4、数据预处理
      1. 2.4.1. 1、删除标签数据
      2. 2.4.2. 2、查看文件行数
      3. 2.4.3. 3、数据截取
  3. 3. 三、Hive实验
    1. 3.1. 平台启动
      1. 3.1.1. MySQL启动
      2. 3.1.2. Hive启动
    2. 3.2. 数据上传
    3. 3.3. 数据查询
    4. 3.4. 查询数据插入临时表
    5. 3.5. Sqoop导入MySQL
    6. 3.6. 错误
  4. 4. 四、Spark
Hadoop | Hadoop双11电商平台数据分析

一、实验平台

Centos

1、登录

1
2
3
平台账号密码
$ hadoop/hadoop
$ root/hadoop01

2、修改ip地址

1
2
3
4
5
6
7
8
9
# vi /etc/sysconfig/network-scripts/ifcfg-enp0s3
IPADDR= 192.168.43.73
NETMASK= 255.255.255.0
GATEWAY= 192.168.43.1
# vi /etc/hosts
$ 添加一行
192.168.43.73 bp01
$ 重启服务
# service network restart

3、虚拟机和电脑可ping可通

*** 主机可ping虚拟机,虚拟机ping不了主机的解决方法:win10防火墙 → 高级设置 → 入站规则,找到配置文件类型为“公用”的“文件和打印共享(回显请求 – ICMPv4-In)”规则,设置为允许即可。

4、启动Hadoop平台

1
2
$ 启动Hadoop平台
# start-all.sh

浏览器可访问 http://192.168.43.73:50070/http://192.168.43.73:8088/ 即启动成功。

Windows

1、版本

Windows7、Jdk 1.8

2、Hosts

C:/Windows/System32/drivers/etc/

二、实验数据

1、数据集

三个文件:用户行为日志文件user_log.csv、回头客训练集train.csv、回头客测试集test.csv。

1、 用户行为日志user_log.csv,日志中的字段定义如下:

  1. user_id | 买家id
  2. item_id | 商品id
  3. cat_id | 商品类别id
  4. merchant_id | 卖家id
  5. brand_id | 品牌id
  6. month | 交易时间:月
  7. day | 交易事件:日
  8. action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
  9. age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
  10. gender | 性别:0表示女性,1表示男性,2和NULL表示未知
  11. province| 收货地址省份

2、 回头客训练集train.csv和回头客测试集test.csv,字段定义如下:

  1. user_id | 买家id
  2. age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
  3. gender | 性别:0表示女性,1表示男性,2和NULL表示未知
  4. merchant_id | 商家id
  5. label | 是否是回头客,0值表示不是回头客,1值表示回头客,-1值表示该用户已经超出我们所需要考虑的预测范围。NULL值只存在测试集,在测试集中表示需要预测的值。

2、数据上传

1、HDFS创建文件夹(本次实验忽略)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;

/**
* 这个类用于给HDFS创建文件夹
*
* @author Ben
*/

public class MakeDir {

public static int BUFFER_SIZE = 4096;

public static void main(String[] args) throws Exception {
URI uri = new URI("hdfs://192.168.43.73:9000");
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
FileSystem fs = FileSystem.get(uri, conf);
// 打开一个输出流
Path srcPath = new Path("/data");//hadoop根目录创建data文件夹
boolean isok = fs.mkdirs(srcPath);
if (isok) {
System.out.println("Folder creation success!");
} else {
System.out.println("Folder creation failed!");
}
fs.close();
}
}

2、上传数据至Centos

通过WinSCP,将数据上传至/usr/data

3、上传数据至HDFS(本次实验忽略)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;

/**
* 这个类用于给HDFS上传文件
*
* @author Ben
*/

public class Upload {
public static int BUFFER_SIZE = 4096;

public static void main(String[] args) throws Exception {
URI uri = new URI("hdfs://192.168.43.73:9000");
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
FileSystem fs = FileSystem.get(uri, conf);

Path srcPath = new Path("C:/Users/Ben/Desktop/data/data_format.zip"); // 原路径
Path dstPath = new Path("/data/"); // 目标路径
// 调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false
fs.copyFromLocalFile(false, srcPath, dstPath);

// 打印文件路径
System.out.println("Upload to " + conf.get("fs.default.name"));
System.out.println("------------list files------------" + "\n");
FileStatus[] fileStatus = fs.listStatus(dstPath);
for (FileStatus file : fileStatus) {
System.out.println(file.getPath());
}
fs.close();
}
}

3、解压文件

1
2
3
4
5
6
7
8
9
10
[root@bp01 ~]$ cd /usr/data
[root@bp01 data]$ ls
data_format.zip
// 让hadoop拥有操作data的权限
[root@bp01 data]$ sudo chown -R hadoop:hadoop /usr/data
[root@bp01 data]$ su hadoop
[hadoop@bp01 ~]$ cd /usr/data
[hadoop@bp01 data]$ unzip data_format.zip
[hadoop@bp01 data]$ ls
data_format.zip test.csv train.csv user_log.csv
1
2
3
4
5
// 查看解压后文件前3行数据
[hadoop@bp01 data]$ head -3 user_log.csv
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西

4、数据预处理

1、删除标签数据

1
2
3
4
5
6
// 删除第一行标签数据
[hadoop@bp01 data]$ sed -i '1d' user_log.csv
[hadoop@bp01 data]$ head -3 user_log.csv
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西

2、查看文件行数

1
2
3
// 查看文件一个多少行数据
[root@bp01 data]# wc -l user_log.csv
54925330 user_log.csv

3、数据截取

1
[root@bp01 data]# vi interception.sh
interception.sh文件内容
#!/bin/bash
#下面设置输入文件,把用户执行interception.sh命令时提供的第一个参数作为输入文件名称
infile=$1
#下面设置输出文件,把用户执行interception.sh命令时提供的第二个参数作为输出文件名称
outfile=$2
#注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面
awk -F "," 'BEGIN{
      id=0;
    }
    {
        if($6==11 && $7==11){
            id=id+1;
            print $1","$2","$3","$4","$5","$6","$7","$8","$9","$10","$11
            if(id==100000){
                exit
            }
        }
    }' $infile > $outfile
1
2
3
4
5
6
7
8
[root@bp01 data]# ls
data_format.zip interception.sh test.csv train.csv user_log.csv
[root@bp01 data]# chmod +x ./interception.sh
[root@bp01 data]# ./interception.sh ./user_log.csv ./mall_user_data.csv
[root@bp01 data]# ls
data_format.zip interception.sh mall_user_data.csv test.csv train.csv user_log.csv
[root@bp01 data]# wc -l mall_user_data.csv
100000 mall_user_data.csv

三、Hive实验

平台启动

MySQL启动

1
2
[root@bp01 ~]# beeline
beeline> !connect jdbc:mysql://192.168.43.122:3306 root !@#123Qaz

Hive启动

1
2
3
4
5
// 启动服务 
!!!先进Hadoop用户再启动Hive!!!
[root@bp01 ~]# hive --service metastore & hive --service hiveserver2 &
[root@bp01 ~]# beeline
beeline> !connect jdbc:hive2://192.168.43.122:10000 hive !@#123Qaz

数据上传

1
2
3
0: jdbc:hive2://192.168.43.73:10000> create database mall;
No rows affected (0.194 seconds)
0: jdbc:hive2://192.168.43.73:10000> show databases;
//创建外部表
CREATE EXTERNAL TABLE mall.user_log(
    user_id INT,
    item_id INT,
    cat_id INT,
    merchant_id INT,
    brand_id INT,
    month STRING,
    day STRING,
    action INT,
    age_range INT,
    gender INT,
    province STRING)
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    STORED AS TEXTFILE LOCATION '/malldata/';
1
2
0: jdbc:hive2://192.168.43.73:10000> use mall;
0: jdbc:hive2://192.168.43.73:10000> show tables;

此时访问http://192.168.43.73:50070/explorer.html#/存在一个malldata文件夹

1
2
// 上传数据
[hadoop@bp01 root]$ hdfs dfs -put /usr/data/mall_user_data.csv /malldata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 查询数据
0: jdbc:hive2://192.168.43.73:10000> select * from user_log limit 10;
// 查看表结构和信息
0: jdbc:hive2://192.168.43.73:10000> desc formatted user_log;
// 表结构
0: jdbc:hive2://192.168.43.73:10000> desc user_log;
+--------------+------------+----------+--+
| col_name | data_type | comment |
+--------------+------------+----------+--+
| user_id | int | |
| item_id | int | |
| cat_id | int | |
| merchant_id | int | |
| brand_id | int | |
| month | string | |
| day | string | |
| action | int | |
| age_range | int | |
| gender | int | |
| province | string | |
+--------------+------------+----------+--+

数据查询

1、用聚合函数count()计算出表内有多少条行数据

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> select count(*) from user_log;
+---------+--+
| c0 |
+---------+--+
| 100000 |
+---------+--+

2、统计男女用户人数

1
2
3
4
5
6
7
8
0: jdbc:hive2://192.168.43.73:10000> select gender,count(*) num from user_log group by gender;
+---------+--------+--+
| gender | num |
+---------+--------+--+
| 0 | 33232 |
| 1 | 33215 |
| 2 | 33553 |
+---------+--------+--+

3、统计不同用户行为数据

1
2
3
4
5
6
7
8
9
0: jdbc:hive2://192.168.43.73:10000> select action,count(*) num from user_log group by action;
+---------+--------+--+
| action | num |
+---------+--------+--+
| 0 | 86357 |
| 1 | 68 |
| 2 | 12476 |
| 3 | 1099 |
+---------+--------+--+

4、男女用户不同年龄段对比

1
0: jdbc:hive2://192.168.43.73:10000> select gender,age_range,count(*) num from user_log group by gender,age_range;

5、不同省份操作数统计

1
0: jdbc:hive2://192.168.43.73:10000> select province,count(*) num from user_log group by province;

6、不同省份成功成交统计

1
s0: jdbc:hive2://192.168.43.73:10000> select province,count(*) num from user_log where action = 2 group by province;

7、访问量前5的商品

1
0: jdbc:hive2://192.168.43.73:10000> select cat_id,count(*) num from user_log group by cat_id order by num desc limit 5;

查询数据插入临时表

1、统计男女用户人数(interim_gender)

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> 
create table interim_gender(gender INT,amount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
0: jdbc:hive2://192.168.43.73:10000> insert overwrite table interim_gender select gender,count(*) num from user_log group by gender;
0: jdbc:hive2://192.168.43.73:10000> select * from interim_gender;

2、统计不同买家行为数据(interim_action)

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> 
create table interim_action(action INT,amount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
0: jdbc:hive2://192.168.43.73:10000> insert overwrite table interim_action select action,count(*) num from user_log group by action;
0: jdbc:hive2://192.168.43.73:10000> select * from interim_action;

3、男女用户不同年龄段对比(interim_gender_age)

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> 
create table interim_gender_age(gender INT,age_range INT,amount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
0: jdbc:hive2://192.168.43.73:10000> insert overwrite table interim_gender_age select gender,age_range,count(*) num from user_log group by gender,age_range;
0: jdbc:hive2://192.168.43.73:10000> select * from interim_gender_age;

4、访问量前10的商品(interim_views)

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> 
create table interim_views(cat_id INT,amount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
0: jdbc:hive2://192.168.43.73:10000> insert overwrite table interim_views select cat_id,count(*) num from user_log group by cat_id order by num desc limit 10;
0: jdbc:hive2://192.168.43.73:10000> select * from interim_views;

5、成交量前10的商品(interim_buy)

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> 
create table interim_buy(cat_id INT,amount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
0: jdbc:hive2://192.168.43.73:10000> insert overwrite table interim_buy select cat_id,count(*) num from user_log where action=2 group by cat_id order by num desc limit 10;
0: jdbc:hive2://192.168.43.73:10000> select * from interim_buy;

6、不同省份成功成交统计(interim_province_buy)

1
2
3
4
5
6
0: jdbc:hive2://192.168.43.73:10000> 
create table interim_province_buy(province STRING,amount INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
0: jdbc:hive2://192.168.43.73:10000> insert overwrite table interim_province_buy select province,count(*) num from user_log where action=2 group by province;
0: jdbc:hive2://192.168.43.73:10000> select * from interim_province_buy;

Sqoop导入MySQL

1、Mysql

1
2
3
4
5
0: jdbc:mysql://192.168.43.73:3306> create database mall;
0: jdbc:mysql://192.168.43.73:3306> use mall;
//查看编码
0: jdbc:mysql://192.168.43.73:3306> show variables like "char%";
0: jdbc:mysql://192.168.43.73:3306> set character_set_server=utf8;

2、导入gender数据

1
2
3
4
5
6
7
[hadoop@bp01 sqoop-1.4.7]$ hadoop fs -cat /user/hive/warehouse/mall.db/interim_gender/000000_0
0 33232
1 33215
2 33553
0: jdbc:mysql://192.168.43.122:3306> CREATE TABLE `mall`.`gender` (`gender` varchar(10),`amount` varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[hadoop@bp01 sqoop-1.4.7]$ sqoop export --connect jdbc:mysql://192.168.43.122:3306/mall --username root --password '!@#123Qaz' --table gender --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/mall.db/interim_gender' -m 1;
0: jdbc:mysql://192.168.43.122:3306> select * from gender;

3、导入action

1
2
3
0: jdbc:mysql://192.168.43.122:3306> CREATE TABLE `mall`.`action` (`action` varchar(10),`amount` varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[hadoop@bp01 sqoop-1.4.7]$ sqoop export --connect jdbc:mysql://192.168.43.122:3306/mall --username root --password '!@#123Qaz' --table action --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/mall.db/interim_action' -m 1;
0: jdbc:mysql://192.168.43.122:3306> select * from action;

4、导入gender_age

1
2
3
0: jdbc:mysql://192.168.43.122:3306> CREATE TABLE `mall`.`gender_age` (`gender` varchar(10),`age_range` varchar(10),`amount` varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[hadoop@bp01 sqoop-1.4.7]$ sqoop export --connect jdbc:mysql://192.168.43.122:3306/mall --username root --password '!@#123Qaz' --table gender_age --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/mall.db/interim_gender_age' -m 1;
0: jdbc:mysql://192.168.43.122:3306> select * from gender_age;

5、导入views

1
2
3
0: jdbc:mysql://192.168.43.122:3306> CREATE TABLE `mall`.`views` (`cat_id` varchar(20),`amount` varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[hadoop@bp01 sqoop-1.4.7]$ sqoop export --connect jdbc:mysql://192.168.43.122:3306/mall --username root --password '!@#123Qaz' --table views --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/mall.db/interim_views' -m 1;
0: jdbc:mysql://192.168.43.122:3306> select * from views;

6、导入buy

1
2
3
0: jdbc:mysql://192.168.43.122:3306> CREATE TABLE `mall`.`buy` (`cat_id` varchar(20),`amount` varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[hadoop@bp01 sqoop-1.4.7]$ sqoop export --connect jdbc:mysql://192.168.43.122:3306/mall --username root --password '!@#123Qaz' --table buy --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/mall.db/interim_buy' -m 1;
0: jdbc:mysql://192.168.43.122:3306> select * from buy;

7、导入province_buy

1
2
3
0: jdbc:mysql://192.168.43.122:3306> CREATE TABLE `mall`.`province_buy` (`province` varchar(10),`amount` varchar(20)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[hadoop@bp01 sqoop-1.4.7]$ sqoop export --connect jdbc:mysql://192.168.43.122:3306/mall --username root --password '!@#123Qaz' --table province_buy --fields-terminated-by '\t' --export-dir '/user/hive/warehouse/mall.db/interim_province_buy' -m 1;
0: jdbc:mysql://192.168.43.122:3306> select * from province_buy;

字段解释:
sqoop export  # 表示数据从 hive 复制到 mysql 中
–connect jdbc:mysql://localhost:3306/mall
–username root # mysql登陆用户名
–password *** # 登录密码
–table user_log # mysql 中的表,即将被导入的表名称
–export-dir '/user/hive/warehouse/mall.db/interim_province_buy' #hive中的文件
–fields-terminated-by ‘\n’ #Hive 中被导出的文件字段的分隔符

错误

1、错误:Permission denied: user=hive, access=EXECUTE, inode=”/tmp”:hadoop:supergroup:drwx——
解决:hadoop fs -chown -R hive:hive /tmp
2、错误:The ownership on the staging directory /tmp/hadoop-yarn/staging/hadoop/.staging is not as expected. It is owned by hive. The directory must be owned by the submitter hadoop or by hadoop
解决:hadoop fs -chown -R hadoop:hadoop /tmp

四、Spark

1
2
[root@bp01 ~]# cd /usr/data
[root@bp01 data]# vi interception_test.sh
#!/bin/bash
#下面设置输入文件,把用户执行interception_test.sh命令时提供的第一个参数作为输入件  名称
infile=$1
#下面设置输出文件,把用户执行interception_test.sh命令时提供的第二个参数作为输出件  名称
outfile=$2
#注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面
awk -F "," 'BEGIN{
      id=0;
    }
    {
        if($1 && $2 && $3 && $4 && !$5){
            id=id+1;
            print $1","$2","$3","$4","1
            if(id==10000){
                exit
            }
        }
    }' $infile > $outfile
1
2
3
4
5
6
[root@bp01 data]# ls
data_format.zip interception.sh interception_test.sh mall_user_data.csv test.csv train.csv user_log.csv
[root@bp01 data]# chmod +x ./interception_test.sh
[root@bp01 data]# ./interception_test.sh ./test.csv ./mall_test.csv、
[root@bp01 data]# wc -l mall_test.csv
10000 mall_test.csv
1
2
[root@bp01 data]# sed -i '1d' train.csv
[root@bp01 data]# vi interception_train.sh
#!/bin/bash
#下面设置输入文件,把用户执行interception_train.sh命令时提供的第一个参数作为输入文件名  称
infile=$1
#下面设置输出文件,把用户执行interception_train.sh命令时提供的第二个参数作为输出文件名  称
outfile=$2
#注意!!最后的$infile > $outfile必须跟在}’这两个字符的后面
awk -F "," 'BEGIN{
         id=0;
    }
    {
        if($1 && $2 && $3 && $4 && ($5!=-1)){
            id=id+1;
            print $1","$2","$3","$4","$5
            if(id==10000){
                exit
            }
        }
    }' $infile > $outfile
1
2
3
4
[root@bp01 data]# chmod +x ./interception_train.sh
[root@bp01 data]# ./interception_train.sh ./train.csv ./mall_train.csv
[root@bp01 data]# wc -l mall_train.csv
10000 mall_train.csv
1
./bin/spark-shell --jars /opt/hadoop/spark-2.3.1/jars/mysql-connector-java-5.1.40-bin.jar --driver-class-path /opt/hadoop/spark-2.3.1/jars/mysql-connector-java-5.1.40-bin.jar
1
[hadoop@bp01 spark-2.3.1]$ spark-shell --jars /usr/local/spark/jars/mysql-connector-java-5.1.46-bin.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.46-bin.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vectors,Vector}
import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val train_data = sc.textFile("/usr/data/mall_train.csv")
val test_data = sc.textFile("/usr/data/mall_test.csv")
val train= train_data.map{line =>
val parts = line.split(',')
LabeledPoint(parts(4).toDouble,Vectors.dense(parts(1).toDouble,parts
(2).toDouble,parts(3).toDouble))
}
val test = test_data.map{line =>
val parts = line.split(',')
LabeledPoint(parts(4).toDouble,Vectors.dense(parts(1).toDouble,parts(2).toDouble,parts(3).toDouble))
}
val numIterations = 1000
val model = SVMWithSGD.train(train, numIterations)
model.clearThreshold()
val scoreAndLabels = test.map{point =>
val score = model.predict(point.features)
score+" "+point.label
}
scoreAndLabels.foreach(println)

model.setThreshold(0.0)
scoreAndLabels.foreach(println)

model.clearThreshold()
val scoreAndLabels = test.map{point =>
val score = model.predict(point.features)
score+" "+point.label
}
//设置回头客数据
val rebuyRDD = scoreAndLabels.map(_.split(" "))
/下面要设置模式信息
val schema = StructType(List(StructField("score", StringType, true),StructField("label", StringType, true)))
//下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = rebuyRDD.map(p => Row(p(0).trim, p(1).trim))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val rebuyDF = spark.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "root") //表示用户名是root
prop.put("password", "!@#123Qaz") //表示密码是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驱动程序是com.mysql.jdbc.Driver
//下面就可以连接数据库,采用append模式,表示追加记录到数据库mall的rebuy表中
rebuyDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/mall", "mall.rebuy", prop)

参考:http://dblab.xmu.edu.cn/blog/1363-2/

文章作者: Ben
文章链接: https://smallbenxiong.github.io/2019/12/26/20191226-%E7%94%B5%E5%95%86%E5%B9%B3%E5%8F%B0%E5%8F%8C11%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Ben Blog
打赏
  • WeChat
  • Alipay

评论