基于Spark的大规模日志分析

基于,spark,大规模,日志,分析 · 浏览次数 : 175

小编点评

**大规模日志分析的步骤和代码示例** **1. 数据来源** * 读取原始日志数据,包含 IP、时间、URL 和用户代理信息。 **2. 数据清洗** * 填充缺失值。 * 去除不合法的记录和重复记录。 * 解析日期和时间格式。 **3. 数据统计** * 统计每个 URL 的访问量。 * 按访问量降序排序。 **4. 数据可视化** * 使用 Matplotlib 库绘制图表,显示前 10 个访问量最高的 URL 地址及其访问量。 **代码示例** ```python import spark import matplotlib.pyplot as plt # 读取原始日志数据 originalRdd = spark.sparkContext.textFile("path/to/logfile") # 对数据进行清洗 filteredRdd = originalRdd.filter( line=["\\d{4}-\\d{2}-\\d{2}", "\\d{2}:\\d{2}:\\d{2}", "\\d+\"", "\\d+\"", "\\d+\"", "\\d+\""].all() ) # 统计每个 URL 的访问量 df = spark.createDataFrame(filteredRdd).toDF("timestamp", "request", "responseCode") totalCount = df.count() errorsCount = df.filter(df["responseCode"] > 400).count() successCount = totalCount - errorsCount # 排序并显示前 10 个访问量最高的 URL 地址及其访问量 topEndpoints = df.groupBy("request").count().orderBy(desc("count")).limit(10) topEndpoints.show() # 绘制图表 plt.bar(range(10), counts, align="center") plt.xticks(range(10), urls, rotation=90) plt.xlabel("Url") plt.ylabel("Count") plt.title("Top 10 Url") plt.show() ``` **其他说明** * 使用 `spark.sql.functions`模块中的统计函数进行分析。 * 可根据实际需求调整数据清洗的条件和参数。 * 使用 matplotlib 库进行可视化。

正文

摘要:本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。

本文分享自华为云社区《【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】》,作者:上进小菜猪。

随着互联网的普及和应用范围的扩大,越来越多的应用场景需要对海量数据进行高效地处理和分析,这就要求我们必须具备大数据技术方面的知识和技能。本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。

1.数据来源

我们的项目是针对某购物网站的访问日志进行分析,其中主要包含以下几个字段:

  • IP:访问的客户端 IP 地址
  • Time:访问时间
  • Url:访问的 URL 地址
  • User-Agent:浏览器标识符

原始数据规模约为 100GB,我们需要对其进行清洗、统计和分析,以得到有用的信息和价值。

2. 数据清洗

由于原始数据存在缺失值、异常值、重复值等问题,因此我们需要进行数据清洗,主要包括以下步骤:

  1. 将原始数据进行格式转换,方便后续处理
  2. 对 IP、Time、Url 和 User-Agent 字段进行解析和提取
  3. 去除不合法的记录和重复的记录

具体代码实现如下:

import org.apache.spark.{SparkConf, SparkContext}
import java.text.SimpleDateFormat
import java.util.Locale
​
object DataCleaning {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataCleaning")
    val sc = new SparkContext(conf)
    val data = sc.textFile("hdfs://master:9000/log/access.log")
​
 // 定义时间格式及地区信息
    val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.ENGLISH)
​
 // 数据清洗
    val cleanData = data.map(line => {
      val arr = line.split(" ")
 if (arr.length >= 9) {
 // 解析 IP
        val ip = arr(0)
​
 // 解析时间,转换为 Unix 时间戳
        val time = dateFormat.parse(arr(3) + " " + arr(4)).getTime / 1000// 解析 URL
        val url = urlDecode(arr(6))
​
 // 解析 UserAgent
        val ua = arr(8)
​
 (ip, time, url, ua)
 }
 }).filter(x => x != null).distinct()
​
 // 结果输出
    cleanData.saveAsTextFile("hdfs://master:9000/cleanData")
​
    sc.stop()
 }
​
 // URL 解码
  def urlDecode(url: String): String = {
    java.net.URLDecoder.decode(url, "utf-8")
 }
}

3. 数据统计

对于大规模数据的处理,我们可以使用 Spark 提供的强大的分布式计算能力,以提高处理效率和减少计算时间。

我们这里使用 Spark SQL 统计每个 URL 的访问量,并输出前 10 个访问量最高的 URL,代码如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
​
case class LogRecord(ip: String, time: Long, url: String, ua: String)
​
object DataAnalysis {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataAnalysis")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
​
 // 读取清洗后的数据
    val cleanData = sc.textFile("hdfs://master:9000/cleanData").filter(x => x != null)
​
 // 将数据转换为 DataFrame
 import sqlContext.implicits._
    val logDF = cleanData.map(_.split(",")).map(p => LogRecord(p(0), p(1).toLong, p(2), p(3))).toDF()
​
 // 统计每个 URL 的访问量,并按访问量降序排序
    val topUrls = logDF.groupBy("url").count().sort($"count".desc)
​
 // 输出前 10 个访问量最高的 URL
    topUrls.take(10).foreach(println)
​
    sc.stop()
 }
}

4. 数据可视化

数据可视化是将处理和分析后的数据以图表或图像的方式展示出来,有利于我们直观地观察数据的规律和趋势。

我们这里采用 Python 的 Matplotlib 库将前 10 个访问量最高的 URL 可视化,代码如下:

import matplotlib.pyplot as plt
​
# 读取数据
with open('topUrls.txt', 'r') as f:
    line = f.readline()
    urls = []
    counts = []
 while line and len(urls) < 10:
        url, count = line.strip().split(',')
        urls.append(url)
        counts.append(int(count))
        line = f.readline()
# 绘制直方图
plt.bar(range(10), counts, align='center')
plt.xticks(range(10), urls, rotation=90)
plt.xlabel('Url')
plt.ylabel('Count')
plt.title('Top 10 Url')
plt.show()

在进行数据清洗前,需要先对原始日志数据进行筛选,选取需要分析的字段。然后进行数据清洗,去掉不必要的空格、特殊字符等,使数据更加规整,并增加可读性。

下面是数据清洗的代码示例:

val originalRdd = spark.sparkContext.textFile("path/to/logfile")
​
val filteredRdd = originalRdd.filter(line => {
  val tokens = line.split("\t")
  tokens.length >= 10 &&
 tokens(0).matches("\d{4}-\d{2}-\d{2}") &&
 tokens(1).matches("\d{2}:\d{2}:\d{2}") &&
 tokens(2).matches("\d+") &&
 tokens(3).matches("\d+") &&
 tokens(4).matches("\d+") &&
 tokens(5).matches("\d+") &&
 tokens(6).matches(".+") &&
 tokens(7).matches(".+") &&
 tokens(8).matches(".+") &&
 tokens(9).matches(".+")
})
​
val cleanedRdd = filteredRdd.map(line => {
  val tokens = line.split("\t")
  val timestamp = s"${tokens(0)} ${tokens(1)}"
  val request = tokens(6).replaceAll(""", "")
  val responseCode = tokens(8).toInt
 (timestamp, request, responseCode)
})

​在上述代码中,我们首先读取原始日志数据,并使用filter函数过滤掉不符合条件的行;然后使用map函数将数据转换为元组的形式,并进行清洗。其中,元组的三个元素分别是时间戳、请求内容和响应状态码。

接下来,让我们来介绍一下如何使用Spark进行数据统计。

数据统计是大规模数据分析中非常重要的一个环节。Spark提供了丰富的聚合函数,可用于对数据进行各种统计分析。

下面是对清洗后的数据进行统计分析的代码示例:

import org.apache.spark.sql.functions._
​
val df = spark.createDataFrame(cleanedRdd).toDF("timestamp", "request", "responseCode")
val totalCount = df.count()
val errorsCount = df.filter(col("responseCode") >= 400).count()
val successCount = totalCount - errorsCount
val topEndpoints = df.groupBy("request").count().orderBy(desc("count")).limit(10)
topEndpoints.show()

在上面的代码中,我们首先将清洗后的数据转换为DataFrame,然后使用count函数计算总记录数和错误记录数,并计算成功记录数。最后使用groupBy和orderBy函数按照请求内容,对数据进行分组统计,并打印出请求次数最多的前10个端点。

通过可视化,我们可以清楚地看到前 10 个访问量最高的 URL 地址及其访问量,这对于进一步分析和优化网站的性能和用户体验具有重要的意义。

总结起来,这就是我们的一个大数据实战项目,我们使用 Spark 统计了购物网站的访问量,并通过 Python 的 Matplotlib 库将结果可视化。这个过程中,我们运用了数据清洗、Spark SQL 统计和可视化等技术,为大规模数据的处理和分析提供了有效的解决方案。

 

点击关注,第一时间了解华为云新鲜技术~

与基于Spark的大规模日志分析相似的内容:

基于Spark的大规模日志分析

摘要:本篇文章将从一个实际项目出发,分享如何使用 Spark 进行大规模日志分析,并通过代码演示加深读者的理解。 本文分享自华为云社区《【实战经验分享】基于Spark的大规模日志分析【上进小菜猪大数据系列】》,作者:上进小菜猪。 随着互联网的普及和应用范围的扩大,越来越多的应用场景需要对海量数据进行

大数据怎么学?对大数据开发领域及岗位的详细解读,完整理解大数据开发领域技术体系

经常有小伙伴和我咨询大数据怎么学,我觉得有必要写一下关于大数据开发的具体方向,下次就不用苦哈哈的打字回复了。直接回复文章。 1.大数据岗位划分 我们通常说的大数据开发主要分为三大方向: 1.1数据平台开发工程师 主要从事后端开发,结合Hadoop,flink,spark等做二次开发,基于底层框架开发

基于卷积神经网络的MAE自监督方法

本文介绍ICLR2023的方法Spark,实现了基于CNN的MAE。

SparkCore

SparkCore RDD基础 定义 ​ 在 Spark 的编程接口中,每一个数据集都被表示为一个对象,称为 RDD。RDD 是 Resillient Distributed Dataset(弹性分布式数据集)的简称,是一个只读的(不可变的)、分区的(分布式的)、容错的、延迟计算的、类型推断的和可缓

基于 Three.js 的 3D 模型加载优化

作为一个3D的项目,从用户打开页面到最终模型的渲染加载的时间也会比普通的H5项目要更长一些,从而造成大量的用户流失。为了提升首屏加载的转化率,需要尽可能的降低loading的时间。这里就分享一些我们在模型加载优化方面的心得。

基于MindSpore实现BERT对话情绪识别

本文分享自华为云社区《【昇思25天学习打卡营打卡指南-第二十四天】基于 MindSpore 实现 BERT 对话情绪识别》,作者:JeffDing。 模型简介 BERT全称是来自变换器的双向编码器表征量(Bidirectional Encoder Representations from Trans

基于 Vagrant 手动部署多个 Redis Server

环境准备 宿主机环境:Windows 10 虚拟机环境:Vagrant + VirtualBox Vagrantfile 配置 首先,我们需要编写一个 Vagrantfile 来定义我们的虚拟机配置。假设已经在 D:\Vagrant\redis 目录下创建了一个 Vagrantfile,其内容如下:

基于EF Core存储的Serilog持久化服务

前言 Serilog是 .NET 上的一个原生结构化高性能日志库,这个库能实现一些比内置库更高度的定制。日志持久化是其中一个非常重要的功能,生产环境通常很难挂接调试器或者某些bug的触发条件很奇怪。为了在脱离调试环境的情况下尽可能保留更多线索来辅助解决生产问题,持久化的日志就显得很重要了。目前Ser

基于EF Core存储的国际化服务

前言 .NET 官方有一个用来管理国际化资源的扩展包Microsoft.Extensions.Localization,ASP.NET Core也用这个来实现国际化功能。但是这个包的翻译数据是使用resx资源文件来管理的,这就意味着无法动态管理。虽然官方有在文档中提供了一些第三方管理方案,但是都不太

基于FileZilla上传、下载服务器数据的方法

本文介绍FileZilla软件的下载、配置与使用方法。 在之前的博客中,我们提到了下载高分遥感影像数据需要用到FTP(文件传输协议,File Transfer Protocol)软件FileZilla;这一软件用以在自己的电脑与服务器之间相互传输数据,在进行下载科学数据、网站开发等等操作时,经常需要