Spaces:
Build error
Build error
# jsonrepair.py - Repair invalid JSON documents in Python | |
# | |
# Just https://github.com/josdejong/jsonrepair ported from TypeScript to Python. | |
# | |
# This port won't get updates, because the goal should be to generate this library instead. | |
# | |
# See: https://github.com/josdejong/jsonrepair/issues/84 | |
# | |
import json | |
import re | |
from typing import Optional | |
CONTROL_CHARACTERS = {"\b": "\\b", "\f": "\\f", "\n": "\\n", "\r": "\\r", "\t": "\\t"} | |
ESCAPE_CHARACTERS = { | |
'"': '"', | |
"\\": "\\", | |
"/": "/", | |
"b": "\b", | |
"f": "\f", | |
"n": "\n", | |
"r": "\r", | |
"t": "\t" | |
# note that \u is handled separately in parseString() | |
} | |
def remove_at_index(text: str, start: int, count: int) -> str: | |
return text[0:start] + text[start + count :] | |
def is_control_character(char: str) -> bool: | |
return char in CONTROL_CHARACTERS | |
def is_valid_string_character(char: str) -> bool: | |
return 0x20 <= ord(char) <= 0x10FFFF | |
def is_quote(char: str) -> bool: | |
return is_single_quote(char) or is_double_quote(char) | |
def is_single_quote(char: str) -> bool: | |
"""Test whether the given character is a single quote character. | |
Also tests for special variants of single quotes. | |
""" | |
return char in ( | |
"'", # U+0027 | |
"‘", # U+2018 | |
"’", # U+2019 | |
"`", # U+0060 | |
"´", # U+00B4 | |
) | |
def is_double_quote(char: str) -> bool: | |
return ( | |
is_ascii_double_quote(char) | |
or is_double_quote_left(char) | |
or is_double_quote_right(char) | |
) | |
def is_ascii_double_quote(char: str) -> bool: | |
return char == '"' # U+0022 | |
def is_double_quote_left(char: str) -> bool: | |
return char == "“" # U+201C | |
def is_double_quote_right(char: str) -> bool: | |
return char == "”" # U+201D | |
def is_start_of_value(char: str) -> bool: | |
regex_start_of_value = ( | |
r"^[[{\w-]$" # alpha, number, minus, or opening bracket or brace | |
) | |
return bool(re.search(regex_start_of_value, char)) or is_quote(char) | |
def ends_with_comma_or_newline(text: str) -> bool: | |
return bool(re.search(r"[,\n][ \t\r]*$", text)) | |
def is_whitespace(char: str) -> bool: | |
return char.isspace() | |
def is_special_whitespace(char: str) -> bool: | |
"""Check if the given character is a special whitespace character, some unicode variant""" | |
return ( | |
char == "\u00A0" # non-breaking space | |
or ord("\u2000") <= ord(char) <= ord("\u200A") | |
or char == "\u202F" | |
or char == "\u205F" | |
or char == "\u3000" | |
) | |
def insert_before_last_whitespace(text: str, text_to_insert: str) -> str: | |
index = len(text) | |
if not is_whitespace(text[index - 1]): | |
# no trailing whitespaces | |
return text + text_to_insert | |
while is_whitespace(text[index - 1]): | |
index -= 1 | |
return text[:index] + text_to_insert + text[index:] | |
def strip_last_occurrence( | |
text: str, text_to_strip: str, strip_remaining: bool = False | |
) -> str: | |
index = text.rindex(text_to_strip) | |
try: | |
return text[:index] + ("" if strip_remaining else text[index + 1 :]) | |
except ValueError: | |
return text | |
def is_hex(char: str) -> bool: | |
try: | |
int(char, 16) | |
return True | |
except ValueError: | |
return False | |
def is_delimiter(char: str) -> bool: | |
return char in ",:[]{}()\n'" or is_quote(char) | |
def at_end_of_block_comment(text: str, i: int) -> bool: | |
return text[i] == "*" and text[i + 1] == "/" | |
class JsonRepairError(Exception): | |
def __init__(self, message: str, position: int): | |
super(JsonRepairError, self).__init__(message + f" at position {position}") | |
self.position = position | |
class JsonRepair: | |
"""Repairs invalid JSON, i.e. change JavaScript notation into JSON notation. | |
Example: | |
try: | |
json = "{name: 'John'}" | |
repaired = JsonRepair(json).repair() | |
print(repaired) | |
# '{"name": "John"}' | |
except JsonRepairFailed as err: | |
print(err) | |
""" | |
def __init__(self, text: str): | |
self.text = text | |
self.i = 0 # current index in text | |
self.output = "" # generated output | |
def char(self, pos: int = 0) -> str: | |
return self.text[self.i + pos] | |
def inc(self, by: int = 1) -> None: | |
self.i += by | |
def dec(self, by: int = 1) -> None: | |
self.i -= by | |
def is_start_of_document(self, pos: int = 0) -> bool: | |
return self.i + pos == 0 | |
def is_end_of_document(self, pos: int = 0) -> bool: | |
return self.i + pos >= len(self.text) | |
def repair(self) -> str: | |
processed = self.parse_value() | |
if not processed: | |
raise self.unexpected_end() | |
processed_comma = self.parse_character(",") | |
if processed_comma: | |
self.parse_whitespace_and_skip_comments() | |
if ( | |
not self.is_end_of_document() | |
and is_start_of_value(self.char()) | |
and ends_with_comma_or_newline(self.output) | |
): | |
# start of a new value after end of the root level object: looks like | |
# newline delimited JSON -> turn into a root level array | |
if not processed_comma: | |
# repair missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
self.parse_newline_delimited_json() | |
elif processed_comma: | |
# repair: remove trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
if self.is_end_of_document(): | |
# reached the end of the document properly | |
return self.output | |
raise self.unexpected_character() | |
def parse_value(self) -> bool: | |
self.parse_whitespace_and_skip_comments() | |
processed = ( | |
self.parse_object() | |
or self.parse_array() | |
or self.parse_string() | |
or self.parse_number() | |
or self.parse_keywords() | |
or self.parse_unquoted_string() | |
) | |
self.parse_whitespace_and_skip_comments() | |
return processed | |
def parse_whitespace_and_skip_comments(self) -> bool: | |
start = self.i | |
changed = self.parse_whitespace() | |
while True: | |
changed = self.parse_comment() | |
if changed: | |
changed = self.parse_whitespace() | |
if not changed: | |
break | |
return self.i > start | |
def parse_whitespace(self) -> bool: | |
whitespace = "" | |
while not self.is_end_of_document(): | |
char = self.char() | |
normal = is_whitespace(char) | |
special = is_special_whitespace(char) | |
if not normal and not special: | |
break | |
if special: | |
whitespace += " " # repair special whitespace | |
else: | |
whitespace += char | |
self.inc() | |
if whitespace: | |
self.output += whitespace | |
return True | |
return False | |
def parse_comment(self) -> bool: | |
# find a block comment '/* ... */' | |
if not self.is_end_of_document() and not self.is_end_of_document(pos=+1): | |
if self.char() == "/" and self.char(pos=+1) == "*": | |
# repair block comment by skipping it | |
while not self.is_end_of_document() and not at_end_of_block_comment( | |
self.text, self.i | |
): | |
self.inc() | |
self.inc(by=2) | |
return True | |
# find a line comment '// ...' | |
if self.char() == "/" and self.char(pos=+1) == "/": | |
# repair line comment by skipping it | |
while not self.is_end_of_document() and self.char() != "\n": | |
self.inc() | |
return True | |
return False | |
def parse_character(self, char: str) -> bool: | |
if not self.is_end_of_document(): | |
if self.char() == char: | |
self.output += char | |
self.inc() | |
return True | |
return False | |
def skip_character(self, char: str) -> bool: | |
if not self.is_end_of_document() and self.char() == char: | |
self.inc() | |
return True | |
return False | |
def skip_escape_character(self) -> bool: | |
return self.skip_character("\\") | |
def parse_object(self) -> bool: | |
"""Parse an object like '{"key": "value"}'""" | |
if not self.is_end_of_document() and self.char() == "{": | |
self.output += "{" | |
self.inc() | |
self.parse_whitespace_and_skip_comments() | |
initial = True | |
while not self.is_end_of_document() and self.char() != "}": | |
if not initial: | |
processed_comma = self.parse_character(",") | |
if not processed_comma: | |
# repair missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
self.parse_whitespace_and_skip_comments() | |
else: | |
processed_comma = True | |
initial = False | |
processed_key = self.parse_string() or self.parse_unquoted_string() | |
if not processed_key: | |
if self.is_end_of_document() or self.char() in "{}[]": | |
# repair trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
break | |
raise self.object_key_expected() | |
self.parse_whitespace_and_skip_comments() | |
processed_colon = self.parse_character(":") | |
if not processed_colon: | |
if is_start_of_value(self.char()): | |
# repair missing colon | |
self.output = insert_before_last_whitespace(self.output, ":") | |
else: | |
raise self.colon_expected() | |
processed_value = self.parse_value() | |
if not processed_value: | |
if processed_colon: | |
raise self.object_value_expected() | |
raise self.colon_expected() | |
if not self.is_end_of_document() and self.char() == "}": | |
self.output += "}" | |
self.inc() | |
else: | |
# repair missing end bracket | |
self.output = insert_before_last_whitespace(self.output, "}") | |
return True | |
return False | |
def parse_array(self) -> bool: | |
"""Parse an array like '["item1", "item2", ...]'""" | |
if not self.is_end_of_document() and self.char() == "[": | |
self.output += "[" | |
self.inc() | |
self.parse_whitespace_and_skip_comments() | |
initial = True | |
while not self.is_end_of_document() and self.char() != "]": | |
if not initial: | |
processed_comma = self.parse_character(",") | |
if not processed_comma: | |
# repair missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
else: | |
initial = False | |
processed_value = self.parse_value() | |
if not processed_value: | |
# repair trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
break | |
if not self.is_end_of_document() and self.char() == "]": | |
self.output += "]" | |
self.inc() | |
else: | |
# repair missing closing array bracket | |
self.output = insert_before_last_whitespace(self.output, "]") | |
return True | |
return False | |
def parse_newline_delimited_json(self): | |
"""Parse and repair Newline Delimited JSON (NDJSON): | |
multiple JSON objects separated by a newline character | |
""" | |
# repair NDJSON | |
initial = True | |
processed_value = True | |
while processed_value: | |
if not initial: | |
# parse optional comma, insert when missing | |
processed_comma = self.parse_character(",") | |
if not processed_comma: | |
# repair: add missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
else: | |
initial = False | |
processed_value = self.parse_value() | |
if not processed_value: | |
# repair: remove trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
# repair: wrap the output inside array brackets | |
self.output = f"[\n{self.output}\n]" | |
def parse_string(self) -> bool: | |
"""Parse a string enclosed by double quotes "...". Can contain escaped quotes | |
Repair strings enclosed in single quotes or special quotes | |
Repair an escaped string | |
""" | |
if not self.is_end_of_document(): | |
skip_escape_chars = self.char() == "\\" | |
if skip_escape_chars: | |
# repair: remove the first escape character | |
self.inc() | |
skip_escape_chars = True | |
if not self.is_end_of_document() and is_quote(self.char()): | |
is_end_quote = ( | |
is_single_quote if is_single_quote(self.char()) else is_double_quote | |
) | |
if self.char() != '"': | |
pass # TODO?: repair non-normalized quote | |
self.output += '"' | |
self.inc() | |
while not self.is_end_of_document() and not is_end_quote(self.char()): | |
if self.char() == "\\": | |
char = self.char(pos=+1) | |
escape_char = ESCAPE_CHARACTERS.get(char) | |
if escape_char: | |
self.output += self.text[self.i : self.i + 2] | |
self.inc(by=2) | |
elif char == "u": | |
if ( | |
not self.is_end_of_document(pos=+5) | |
and is_hex(self.char(pos=+2)) | |
and is_hex(self.char(pos=+3)) | |
and is_hex(self.char(pos=+4)) | |
and is_hex(self.char(pos=+5)) | |
): | |
self.output += self.text[self.i : self.i + 6] | |
self.inc(by=6) | |
else: | |
raise self.invalid_unicode_character(self.i) | |
else: | |
# repair invalid escape character: remove it | |
self.output += char | |
self.inc(by=2) | |
else: | |
char = self.char() | |
if char == '"' and self.char(pos=-1) != "\\": | |
# repair unescaped double quote | |
self.output += "\\" + char | |
self.inc() | |
elif is_control_character(char): | |
# unescaped control character | |
self.output += CONTROL_CHARACTERS[char] | |
self.inc() | |
else: | |
if not is_valid_string_character(char): | |
raise self.invalid_character(char) | |
self.output += char | |
self.inc() | |
if skip_escape_chars: | |
processed = self.skip_escape_character() | |
if processed: | |
pass # repair: skipped escape character (nothing to do) | |
if not self.is_end_of_document() and is_quote(self.char()): | |
if self.char() != '"': | |
pass # TODO:? repair non-normalized quote | |
self.output += '"' | |
self.inc() | |
else: | |
# repair missing end quote | |
self.output += '"' | |
self.parse_concatenated_string() | |
return True | |
return False | |
def parse_concatenated_string(self) -> bool: | |
"""Repair concatenated strings like \"hello\" + \"world\", change this into \"helloworld\" """ | |
processed = False | |
self.parse_whitespace_and_skip_comments() | |
while not self.is_end_of_document() and self.char() == "+": | |
processed = True | |
self.inc() | |
self.parse_whitespace_and_skip_comments() | |
# repair: remove the end quote of the first string | |
self.output = strip_last_occurrence(self.output, '"', True) | |
start = len(self.output) | |
self.parse_string() | |
# repair: remove the start quote of the second string | |
self.output = remove_at_index(self.output, start, 1) | |
return processed | |
def parse_number(self) -> bool: | |
"""Parse a number like 2.4 or 2.4e6""" | |
if not self.is_end_of_document(): | |
start = self.i | |
if self.char() == "-": | |
self.inc() | |
err = self.expect_digit(start) | |
if err: | |
raise err | |
if not self.is_end_of_document() and self.char() == "0": | |
self.inc() | |
elif not self.is_end_of_document() and self.char() in "123456789": | |
self.inc() | |
while not self.is_end_of_document() and self.char().isdigit(): | |
self.inc() | |
if not self.is_end_of_document() and self.char() == ".": | |
self.inc() | |
err = self.expect_digit(start) | |
if err: | |
raise err | |
while not self.is_end_of_document() and self.char().isdigit(): | |
self.inc() | |
if not self.is_end_of_document() and self.char() in "eE": | |
self.inc() | |
if not self.is_end_of_document() and self.char() in "+-": | |
self.inc() | |
err = self.expect_digit(start) | |
if err: | |
raise err | |
while not self.is_end_of_document() and self.char().isdigit(): | |
self.inc() | |
if self.i > start: | |
self.output += self.text[start : self.i] | |
return True | |
return False | |
def parse_keywords(self) -> bool: | |
"""Parse keywords true, false, null | |
Repair Python keywords True, False, None | |
""" | |
return ( | |
self.parse_keyword("true", "true") | |
or self.parse_keyword("false", "false") | |
or self.parse_keyword("null", "null") | |
# repair Python keywords True, False, None | |
or self.parse_keyword("True", "true") | |
or self.parse_keyword("False", "false") | |
or self.parse_keyword("None", "null") | |
) | |
def parse_keyword(self, name: str, value: str) -> bool: | |
if self.text[self.i : self.i + len(name)] == name: | |
self.output += value | |
self.inc(by=len(name)) | |
return True | |
return False | |
def parse_unquoted_string(self) -> bool: | |
"""Repair and unquoted string by adding quotes around it | |
Repair a MongoDB function call like NumberLong("2") | |
Repair a JSONP function call like callback({...}); | |
""" | |
# note that the symbol can end with whitespaces: we stop at the next delimiter | |
start = self.i | |
while not self.is_end_of_document() and not is_delimiter(self.char()): | |
self.inc() | |
if self.i > start: | |
if not self.is_end_of_document() and self.char() == "(": | |
# repair a MongoDB function call like NumberLong("2") | |
# repair a JSONP function call like callback({...}); | |
self.inc() | |
self.parse_value() | |
if not self.is_end_of_document() and self.char() == ")": | |
# repair: skip close bracket of function call | |
self.inc() | |
if not self.is_end_of_document() and self.char() == ";": | |
# repair: skip semicolon after JSONP call | |
self.inc() | |
return True | |
# else repair unquoted string | |
# first, go back to prevent getting trailing whitespaces in the string | |
while not self.is_start_of_document() and is_whitespace(self.char(pos=-1)): | |
self.dec() | |
symbol = self.text[start : self.i] | |
self.output += json.dumps(symbol) | |
return True | |
return False | |
def expect_digit(self, start: int) -> Optional[JsonRepairError]: | |
if self.is_end_of_document() or not self.char().isdigit(): | |
num_so_far = self.text[start : self.i] | |
return JsonRepairError( | |
f"Invalid number '{num_so_far}', expecting a digit {self.got()}", 2 | |
) | |
def invalid_character(self, char: str) -> JsonRepairError: | |
return JsonRepairError("Invalid character " + json.dumps(char), self.i) | |
def unexpected_character(self) -> JsonRepairError: | |
return JsonRepairError( | |
"Unexpected character " + json.dumps(self.text[self.i]), self.i | |
) | |
def unexpected_end(self) -> JsonRepairError: | |
return JsonRepairError("Unexpected end of json string", len(self.text)) | |
def object_key_expected(self) -> JsonRepairError: | |
return JsonRepairError("Object key expected", self.i) | |
def object_value_expected(self) -> JsonRepairError: | |
return JsonRepairError("Object value expected", self.i) | |
def colon_expected(self) -> JsonRepairError: | |
return JsonRepairError("Colon expected", self.i) | |
def invalid_unicode_character(self, start: int) -> JsonRepairError: | |
end = start + 2 | |
while re.match(r"\w", self.text[end]): | |
end += 1 | |
chars = self.text[start:end] | |
return JsonRepairError(f'Invalid unicode character "{chars}"', self.i) | |
def got(self) -> str: | |
return ( | |
f"but got '{self.char()}'" | |
if not self.is_end_of_document() | |
else "but reached end of input" | |
) | |