Problem Statement

I have a JSON column in my DataFrame.

  • The JSON is in string format.
  • It is a nested JSON.
  • It is a large string.
  • I do not know the schema and want to avoid defining it manually.
  • All the JSONs follow the same schema definition.

I need to format it as a JSON object (struct) to extract anything out of it. How do I convert it into a struct?

Solution

Here is the solution if you are short on time. In the next section, I discuss it in more detail.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# Spark 3.2.1 | Scala 2.12
import pyspark.sql.functions as F

# Sample json we will work with.
sample_json = """
{
  "lvl1":  {
    "lvl2a": {
      "lvl3a":   {
        "lvl4a": "random_data",
        "lvl4b": "random_data"
      }
    },
    "lvl2b":   {
      "lvl3a":   {
        "lvl4a": "ramdom_data"
      },
      "lvl3b":  [
        {"lvl4a": "random_data"},
        {"lvl4b": "random_data"}
      ]
    }
  }
}
"""

# Spark dataframe with json column
df = spark.createDataFrame([(sample_json,)]*4, ["json_data"])

# determine the schema
json_schema = F.schema_of_json(df.select(F.col("json_data")).first()[0])

# converting json to struct
df = df.withColumn("json_data_struct", F.from_json("json_data", json_schema))

Details

We will use pyspark.sql.functions.schema_of_json to do our dirty work of determining the schema.

Just like any other column-based function, I expected this function to work on a column. So I tried this as below:

df = df.withColumn("sch", F.schema_of_json(F.col("json_data")))

It threw the below error:

AnalysisException: cannot resolve 'schema_of_json(json_data)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got json_data.;
...

I did not know what is a foldable string. The data type of the json_data column was a string. The ChatGPT also suggested the same way of using this function. :)

The documentation and multiple Stack Overflow answers [1, 2, 3] helped me reach an explanation.

The schema_of_json needs a single string instead of a column. So I extracted one JSON string from the column and passed it to the function. This is how I did it:

1
2
json_string = df.select(F.col("json_data")).first()[0]
json_schema = F.schema_of_json(json_string)

The end.