Spark SQL轻松读取JSON数据:从基础到实践的完整指南
在数据处理领域,JSON(JavaScript Object Notation)因其灵活、易读的特性,已成为主流的数据交换格式之一,Apache Spark作为大数据处理的利器,提供了强大的JSON数据读取能力,尤其在Spark SQL模块中,通过内置的spark.read接口,可以轻松实现JSON数据的加载与解析,本文将详细介绍Spark读取JSON数据的多种方法、核心参数、常见问题及解决方案,帮助读者从入门到熟练这一技能。
Spark读取JSON数据的基础方法
Spark提供了两种主要的JSON读取方式:读取JSON文件(适用于存储在HDFS、本地文件系统、S3等分布式文件系统中的JSON文件)和读取JSON字符串(适用于内存中的JSON数据)。
读取JSON文件
Spark的DataFrameReader是读取结构化数据的入口,通过spark.read获取实例后,调用json()方法即可读取JSON文件,假设有一个名为users.json的本地文件,内容如下:
{"id": 1, "name": "Alice", "age": 25, "city": "New York"}
{"id": 2, "name": "Bob", "age": 30, "city": "San Francisco"}
{"id": 3, "name": "Charlie", "age": 35, "city": "Chicago"}
示例代码(PySpark)
from pyspark.sql import SparkSession
# 创建SparkSession
spark = SparkSession.builder \
.appName("JSONReadingExample") \
.getOrCreate()
# 读取JSON文件
df = spark.read.json("path/to/users.json")
# 显示数据
df.show()
df.printSchema()
输出结果
+---+--------+---+-------------+
|age| city| id| name|
+---+--------+---+-------------+
| 25|New York| 1| Alice|
| 30|San Fran| 2| Bob|
| 35| Chicago| 3| Charlie|
+---+--------+---+-------------+
root
|-- age: long (nullable = true)
|-- city: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
关键说明
- 路径支持:
path参数可以是文件路径(如"/path/to/file.json")、目录路径(如"/path/to/directory/",会读取目录下所有JSON文件),或是通配符路径(如"/path/to/*.json")。 - 自动推断Schema:Spark会读取JSON文件的部分数据(默认采样100行),自动推断字段名和数据类型,若JSON数据结构复杂(如嵌套对象、数组),推断结果可能不准确,需通过
schema参数手动指定。
读取JSON字符串(内存数据)
如果JSON数据以字符串形式存在于内存中(如从API获取或变量定义),可通过spark.read.json直接传入字符串列表或RDD。
示例代码(PySpark)
json_strings = [
'{"id": 4, "name": "David", "age": 28, "city": "Seattle"}',
'{"id": 5, "name": "Eve", "age": 32, "city": "Boston"}'
]
# 从字符串列表读取DataFrame
df_from_strings = spark.read.json(json_strings)
df_from_strings.show()
输出结果
+---+-----+---+--------+
|age| city| id| name|
+---+-----+---+--------+
| 28|Seattle| 4| David|
| 32| Boston| 5| Eve|
+---+-----+---+--------+
核心参数与配置
Spark读取JSON数据时,可通过DataFrameReader的多个参数控制读取行为,以下是常用参数的说明:
schema:手动指定Schema
当JSON数据结构复杂(如嵌套、字段类型不一致)或自动推断不准确时,需手动指定Schema,Schema可通过StructType和StructField定义。
示例代码(手动指定Schema)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# 定义Schema
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("address", StructType([
StructField("city", StringType(), True),
StructField("zip", StringType(), True)
]), True)
])
# 读取JSON文件并应用Schema
df_with_schema = spark.read.schema(schema).json("path/to/nested_users.json")
df_with_schema.printSchema()
说明
StructType:表示Schema的结构,包含多个StructField。StructField:定义字段名、数据类型(如IntegerType、StringType)和是否可空(nullable=True允许为空)。
multiLine:处理多行JSON文件
默认情况下,spark.read.json要求每行是一个独立的JSON对象(即JSON Lines格式,.jsonl),若文件是标准的JSON格式(包含嵌套的大括号,多行组成一个对象),需设置multiLine=True。
示例(多行JSON文件)
[
{"id": 1, "name": "Alice", "hobbies": ["reading", "hiking"]},
{"id": 2, "name": "Bob", "hobbies": ["coding", "gaming"]}
]
读取代码
df_multi_line = spark.read.option("multiLine", "true").json("path/to/multi_line.json")
df_multi_line.show()
输出结果
+---+-----+----------------+
| id| name| hobbies|
+---+-----+----------------+
| 1|Alice|[reading, hiking]|
| 2| Bob|[coding, gaming]|
+---+-----+----------------+
primitivesAsString:控制原始数据类型
默认情况下,JSON中的数字会被推断为LongType或DoubleType,字符串为StringType,若希望所有原始类型均作为字符串处理(避免类型转换问题),可设置primitivesAsString=True。
示例
df_as_string = spark.read.option("primitivesAsString", "true").json("path/to/users.json")
df_as_string.printSchema()
输出(数字字段转为字符串)
root
|-- age: string (nullable = true)
|-- city: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
allowUnquotedFieldNames:允许非引数字段名
标准JSON要求字段名用双引号包裹,但某些场景下可能存在非引数字段名(如{id: 1, name: "Alice"}),通过allowUnquotedFieldNames=True可放宽此限制。
mode:处理异常数据的模式
当JSON数据格式不规范时(如缺少字段、类型不匹配),可通过mode参数指定处理策略,常用值包括:
PERMISSIVE(默认):保留异常数据,将无效字段值设为null,并在_corrupt_record字段中记录原始数据。DROPMALFORMED:直接丢弃格式不正确的行。FAILFAST:遇到异常数据时直接抛出异常。
示例(PERMISSIVE模式)
# 包含异常数据的JSON文件(第二行缺少"age"字段)
{"id": 1, "name": "Alice", "age": 25}
{"id": 2, "name": "Bob"} # 缺少age
{"id": 3, "age": "thirty"} # age类型为字符串
df_permissive = spark.read.json("path/to/malformed.json")
df_permissive.show()
输出结果(_corrupt_record记录异常行)
+----+-----+---+--------------------+
| age| name| id| _corrupt_record|
+----+-----+---+--------------------+
| 25|Alice| 1|{"id": 1, "name":...|
|null| Bob| 2|{"id": 2, "name":...|
|null| null| 3|{"id": 3, "age": "...|
+----+-----+---+--------------------+
复杂JSON数据的读取与处理
实际场景中,JSON数据常包含嵌套对象、数组等复杂数据结构,Spark需结合schema定义和函数解析(如from_json)处理此类数据。
读取嵌套JSON
假设有一个嵌套JSON文件nested.json:
{"id": 1, "


还没有评论,来说两句吧...