设为首页 - 加入收藏 哈尔滨站长网 (http://www.0451zz.com)- 国内知名站长资讯网站,提供最新最全的站长资讯,创业经验,网站建设等!
热搜: vivo 2018 face javascript
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

使用Spark Streaming SQL进行PV/UV统计

发布时间:2019-10-20 08:14 所属栏目:[教程] 来源:ligh-rain
导读:1.背景介绍 PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

1.背景介绍

PV/UV统计是流式分析一个常见的场景。通过PV可以对访问的网站做流量或热点分析,例如广告主可以通过PV值预估投放广告网页所带来的流量以及广告收入。另外一些场景需要对访问的用户作分析,比如分析用户的网页点击行为,此时就需要对UV做统计。

使用Spark Streaming SQL,并结合Redis可以很方便进行PV/UV的统计。本文将介绍通过Streaming SQL消费Loghub中存储的用户访问信息,对过去1分钟内的数据进行PV/UV统计,将结果存入Redis中。

2.准备工作

  • 创建E-MapReduce 3.23.0以上版本的Hadoop集群。
  • 下载并编译E-MapReduce-SDK包
  1. git?clone?git@github.com:aliyun/aliyun-emapreduce-sdk.git?
  2. cd?aliyun-emapreduce-sdk?
  3. git?checkout?-b?master-2.x?origin/master-2.x?
  4. mvn?clean?package?-DskipTests?

编译完后, assembly/target目录下会生成emr-datasources_shaded_${version}.jar,其中${version}为sdk的版本。

数据源

本文采用Loghub作为数据源,有关日志采集、日志解析请参考日志服务。

3.统计PV/UV

一般场景下需要将统计出的PV/UV以及相应的统计时间存入Redis。其他一些业务场景中,也会只保存最新结果,用新的结果不断覆盖更新旧的数据。以下首先介绍第一种情况的操作流程。

3.1启动客户端

命令行启动streaming-sql客户端

  1. streaming-sql?--master?yarn-client?--num-executors?2?--executor-memory?2g?--executor-cores?2?--jars?emr-datasources_shaded_2.11-${version}.jar?--driver-class-path?emr-datasources_shaded_2.11-${version}.jar?

也可以创建SQL语句文件,通过streaming-sql -f的方式运行。

3.1定义数据表

数据源表定义如下

  1. CREATE?TABLE?loghub_source(user_ip?STRING,?__time__?TIMESTAMP)??
  2. USING?loghub??
  3. OPTIONS(?
  4. sls.project=${sls.project},?
  5. sls.store=${sls.store},?
  6. access.key.id=${access.key.id},?
  7. access.key.secret=${access.key.secret},?
  8. endpoint=${endpoint});?

其中,数据源表包含user_ip和__time__两个字段,分别代表用户的IP地址和loghub上的时间列。OPTIONS中配置项的值根据实际配置。

结果表定义如下

  1. CREATE?TABLE?redis_sink??
  2. USING?redis??
  3. OPTIONS(?
  4. table='statistic_info',?
  5. host=${redis_host},?
  6. key.column='interval');?

其中,statistic_info为Redis存储结果的表名,interval对应统计结果中的interval字段;配置项${redis_host}的值根据实际配置。

3.2创建流作业

  1. CREATE?SCAN?loghub_scan?
  2. ON?loghub_source?
  3. USING?STREAM?
  4. OPTIONS(?
  5. watermark.column='__time__',?
  6. watermark.delayThreshold='10?second');?
  7. ?
  8. CREATE?STREAM?job?
  9. OPTIONS(?
  10. checkpointLocation=${checkpoint_location})?
  11. INSERT?INTO?redis_sink?
  12. SELECT?COUNT(user_ip)?AS?pv,?approx_count_distinct(?user_ip)?AS?uv,?window.end?AS?interval?
  13. FROM?loghub_scan?
  14. GROUP?BY?TUMBLING(__time__,?interval?1?minute),?window;?

4.3查看统计结果

最终的统计结果如下图所示

使用Spark Streaming SQL进行PV/UV统计

可以看到,每隔一分钟都会生成一条数据,key的形式为表名:interval,value为pv和uv的值。

3.4实现覆盖更新

将结果表的配置项key.column修改为一个固定的值,例如定义如下

  1. CREATE?TABLE?redis_sink?
  2. USING?redis??
  3. OPTIONS(?
  4. table='statistic_info',?
  5. host=${redis_host},?
  6. key.column='statistic_type');?

创建流作业的SQL改为

  1. CREATE?STREAM?job?
  2. OPTIONS(?
  3. checkpointLocation='/tmp/spark-test/checkpoint')?
  4. INSERT?INTO?redis_sink?
  5. SELECT?"PV_UV"?as?statistic_type,COUNT(user_ip)?AS?pv,?approx_count_distinct(?user_ip)?AS?uv,?window.end?AS?interval?
  6. FROM?loghub_scan?
  7. GROUP?BY?TUMBLING(__time__,?interval?1?minute),?window;?

最终的统计结果如下图所示

使用Spark Streaming SQL进行PV/UV统计

可以看到,Redis中值保留了一个值,这个值每分钟都被更新,value包含pv、uv和interval的值。

4.总结

本文简要介绍了使用Streaming SQL结合Redis实现流式处理中统计PV/UV的需求。后续文章,我将介绍Spark Streaming SQL的更多内容。

【免责声明】本站内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。

网友评论
推荐文章