AgentVerse's picture
bump version to 0.1.8
01523b5
# jsonrepair.py - Repair invalid JSON documents in Python
#
# Just https://github.com/josdejong/jsonrepair ported from TypeScript to Python.
#
# This port won't get updates, because the goal should be to generate this library instead.
#
# See: https://github.com/josdejong/jsonrepair/issues/84
#
import json
import re
from typing import Optional
CONTROL_CHARACTERS = {"\b": "\\b", "\f": "\\f", "\n": "\\n", "\r": "\\r", "\t": "\\t"}
ESCAPE_CHARACTERS = {
'"': '"',
"\\": "\\",
"/": "/",
"b": "\b",
"f": "\f",
"n": "\n",
"r": "\r",
"t": "\t"
# note that \u is handled separately in parseString()
}
def remove_at_index(text: str, start: int, count: int) -> str:
return text[0:start] + text[start + count :]
def is_control_character(char: str) -> bool:
return char in CONTROL_CHARACTERS
def is_valid_string_character(char: str) -> bool:
return 0x20 <= ord(char) <= 0x10FFFF
def is_quote(char: str) -> bool:
return is_single_quote(char) or is_double_quote(char)
def is_single_quote(char: str) -> bool:
"""Test whether the given character is a single quote character.
Also tests for special variants of single quotes.
"""
return char in (
"'", # U+0027
"‘", # U+2018
"’", # U+2019
"`", # U+0060
"´", # U+00B4
)
def is_double_quote(char: str) -> bool:
return (
is_ascii_double_quote(char)
or is_double_quote_left(char)
or is_double_quote_right(char)
)
def is_ascii_double_quote(char: str) -> bool:
return char == '"' # U+0022
def is_double_quote_left(char: str) -> bool:
return char == "“" # U+201C
def is_double_quote_right(char: str) -> bool:
return char == "”" # U+201D
def is_start_of_value(char: str) -> bool:
regex_start_of_value = (
r"^[[{\w-]$" # alpha, number, minus, or opening bracket or brace
)
return bool(re.search(regex_start_of_value, char)) or is_quote(char)
def ends_with_comma_or_newline(text: str) -> bool:
return bool(re.search(r"[,\n][ \t\r]*$", text))
def is_whitespace(char: str) -> bool:
return char.isspace()
def is_special_whitespace(char: str) -> bool:
"""Check if the given character is a special whitespace character, some unicode variant"""
return (
char == "\u00A0" # non-breaking space
or ord("\u2000") <= ord(char) <= ord("\u200A")
or char == "\u202F"
or char == "\u205F"
or char == "\u3000"
)
def insert_before_last_whitespace(text: str, text_to_insert: str) -> str:
index = len(text)
if not is_whitespace(text[index - 1]):
# no trailing whitespaces
return text + text_to_insert
while is_whitespace(text[index - 1]):
index -= 1
return text[:index] + text_to_insert + text[index:]
def strip_last_occurrence(
text: str, text_to_strip: str, strip_remaining: bool = False
) -> str:
index = text.rindex(text_to_strip)
try:
return text[:index] + ("" if strip_remaining else text[index + 1 :])
except ValueError:
return text
def is_hex(char: str) -> bool:
try:
int(char, 16)
return True
except ValueError:
return False
def is_delimiter(char: str) -> bool:
return char in ",:[]{}()\n'" or is_quote(char)
def at_end_of_block_comment(text: str, i: int) -> bool:
return text[i] == "*" and text[i + 1] == "/"
class JsonRepairError(Exception):
def __init__(self, message: str, position: int):
super(JsonRepairError, self).__init__(message + f" at position {position}")
self.position = position
class JsonRepair:
"""Repairs invalid JSON, i.e. change JavaScript notation into JSON notation.
Example:
try:
json = "{name: 'John'}"
repaired = JsonRepair(json).repair()
print(repaired)
# '{"name": "John"}'
except JsonRepairFailed as err:
print(err)
"""
def __init__(self, text: str):
self.text = text
self.i = 0 # current index in text
self.output = "" # generated output
def char(self, pos: int = 0) -> str:
return self.text[self.i + pos]
def inc(self, by: int = 1) -> None:
self.i += by
def dec(self, by: int = 1) -> None:
self.i -= by
def is_start_of_document(self, pos: int = 0) -> bool:
return self.i + pos == 0
def is_end_of_document(self, pos: int = 0) -> bool:
return self.i + pos >= len(self.text)
def repair(self) -> str:
processed = self.parse_value()
if not processed:
raise self.unexpected_end()
processed_comma = self.parse_character(",")
if processed_comma:
self.parse_whitespace_and_skip_comments()
if (
not self.is_end_of_document()
and is_start_of_value(self.char())
and ends_with_comma_or_newline(self.output)
):
# start of a new value after end of the root level object: looks like
# newline delimited JSON -> turn into a root level array
if not processed_comma:
# repair missing comma
self.output = insert_before_last_whitespace(self.output, ",")
self.parse_newline_delimited_json()
elif processed_comma:
# repair: remove trailing comma
self.output = strip_last_occurrence(self.output, ",")
if self.is_end_of_document():
# reached the end of the document properly
return self.output
raise self.unexpected_character()
def parse_value(self) -> bool:
self.parse_whitespace_and_skip_comments()
processed = (
self.parse_object()
or self.parse_array()
or self.parse_string()
or self.parse_number()
or self.parse_keywords()
or self.parse_unquoted_string()
)
self.parse_whitespace_and_skip_comments()
return processed
def parse_whitespace_and_skip_comments(self) -> bool:
start = self.i
changed = self.parse_whitespace()
while True:
changed = self.parse_comment()
if changed:
changed = self.parse_whitespace()
if not changed:
break
return self.i > start
def parse_whitespace(self) -> bool:
whitespace = ""
while not self.is_end_of_document():
char = self.char()
normal = is_whitespace(char)
special = is_special_whitespace(char)
if not normal and not special:
break
if special:
whitespace += " " # repair special whitespace
else:
whitespace += char
self.inc()
if whitespace:
self.output += whitespace
return True
return False
def parse_comment(self) -> bool:
# find a block comment '/* ... */'
if not self.is_end_of_document() and not self.is_end_of_document(pos=+1):
if self.char() == "/" and self.char(pos=+1) == "*":
# repair block comment by skipping it
while not self.is_end_of_document() and not at_end_of_block_comment(
self.text, self.i
):
self.inc()
self.inc(by=2)
return True
# find a line comment '// ...'
if self.char() == "/" and self.char(pos=+1) == "/":
# repair line comment by skipping it
while not self.is_end_of_document() and self.char() != "\n":
self.inc()
return True
return False
def parse_character(self, char: str) -> bool:
if not self.is_end_of_document():
if self.char() == char:
self.output += char
self.inc()
return True
return False
def skip_character(self, char: str) -> bool:
if not self.is_end_of_document() and self.char() == char:
self.inc()
return True
return False
def skip_escape_character(self) -> bool:
return self.skip_character("\\")
def parse_object(self) -> bool:
"""Parse an object like '{"key": "value"}'"""
if not self.is_end_of_document() and self.char() == "{":
self.output += "{"
self.inc()
self.parse_whitespace_and_skip_comments()
initial = True
while not self.is_end_of_document() and self.char() != "}":
if not initial:
processed_comma = self.parse_character(",")
if not processed_comma:
# repair missing comma
self.output = insert_before_last_whitespace(self.output, ",")
self.parse_whitespace_and_skip_comments()
else:
processed_comma = True
initial = False
processed_key = self.parse_string() or self.parse_unquoted_string()
if not processed_key:
if self.is_end_of_document() or self.char() in "{}[]":
# repair trailing comma
self.output = strip_last_occurrence(self.output, ",")
break
raise self.object_key_expected()
self.parse_whitespace_and_skip_comments()
processed_colon = self.parse_character(":")
if not processed_colon:
if is_start_of_value(self.char()):
# repair missing colon
self.output = insert_before_last_whitespace(self.output, ":")
else:
raise self.colon_expected()
processed_value = self.parse_value()
if not processed_value:
if processed_colon:
raise self.object_value_expected()
raise self.colon_expected()
if not self.is_end_of_document() and self.char() == "}":
self.output += "}"
self.inc()
else:
# repair missing end bracket
self.output = insert_before_last_whitespace(self.output, "}")
return True
return False
def parse_array(self) -> bool:
"""Parse an array like '["item1", "item2", ...]'"""
if not self.is_end_of_document() and self.char() == "[":
self.output += "["
self.inc()
self.parse_whitespace_and_skip_comments()
initial = True
while not self.is_end_of_document() and self.char() != "]":
if not initial:
processed_comma = self.parse_character(",")
if not processed_comma:
# repair missing comma
self.output = insert_before_last_whitespace(self.output, ",")
else:
initial = False
processed_value = self.parse_value()
if not processed_value:
# repair trailing comma
self.output = strip_last_occurrence(self.output, ",")
break
if not self.is_end_of_document() and self.char() == "]":
self.output += "]"
self.inc()
else:
# repair missing closing array bracket
self.output = insert_before_last_whitespace(self.output, "]")
return True
return False
def parse_newline_delimited_json(self):
"""Parse and repair Newline Delimited JSON (NDJSON):
multiple JSON objects separated by a newline character
"""
# repair NDJSON
initial = True
processed_value = True
while processed_value:
if not initial:
# parse optional comma, insert when missing
processed_comma = self.parse_character(",")
if not processed_comma:
# repair: add missing comma
self.output = insert_before_last_whitespace(self.output, ",")
else:
initial = False
processed_value = self.parse_value()
if not processed_value:
# repair: remove trailing comma
self.output = strip_last_occurrence(self.output, ",")
# repair: wrap the output inside array brackets
self.output = f"[\n{self.output}\n]"
def parse_string(self) -> bool:
"""Parse a string enclosed by double quotes "...". Can contain escaped quotes
Repair strings enclosed in single quotes or special quotes
Repair an escaped string
"""
if not self.is_end_of_document():
skip_escape_chars = self.char() == "\\"
if skip_escape_chars:
# repair: remove the first escape character
self.inc()
skip_escape_chars = True
if not self.is_end_of_document() and is_quote(self.char()):
is_end_quote = (
is_single_quote if is_single_quote(self.char()) else is_double_quote
)
if self.char() != '"':
pass # TODO?: repair non-normalized quote
self.output += '"'
self.inc()
while not self.is_end_of_document() and not is_end_quote(self.char()):
if self.char() == "\\":
char = self.char(pos=+1)
escape_char = ESCAPE_CHARACTERS.get(char)
if escape_char:
self.output += self.text[self.i : self.i + 2]
self.inc(by=2)
elif char == "u":
if (
not self.is_end_of_document(pos=+5)
and is_hex(self.char(pos=+2))
and is_hex(self.char(pos=+3))
and is_hex(self.char(pos=+4))
and is_hex(self.char(pos=+5))
):
self.output += self.text[self.i : self.i + 6]
self.inc(by=6)
else:
raise self.invalid_unicode_character(self.i)
else:
# repair invalid escape character: remove it
self.output += char
self.inc(by=2)
else:
char = self.char()
if char == '"' and self.char(pos=-1) != "\\":
# repair unescaped double quote
self.output += "\\" + char
self.inc()
elif is_control_character(char):
# unescaped control character
self.output += CONTROL_CHARACTERS[char]
self.inc()
else:
if not is_valid_string_character(char):
raise self.invalid_character(char)
self.output += char
self.inc()
if skip_escape_chars:
processed = self.skip_escape_character()
if processed:
pass # repair: skipped escape character (nothing to do)
if not self.is_end_of_document() and is_quote(self.char()):
if self.char() != '"':
pass # TODO:? repair non-normalized quote
self.output += '"'
self.inc()
else:
# repair missing end quote
self.output += '"'
self.parse_concatenated_string()
return True
return False
def parse_concatenated_string(self) -> bool:
"""Repair concatenated strings like \"hello\" + \"world\", change this into \"helloworld\" """
processed = False
self.parse_whitespace_and_skip_comments()
while not self.is_end_of_document() and self.char() == "+":
processed = True
self.inc()
self.parse_whitespace_and_skip_comments()
# repair: remove the end quote of the first string
self.output = strip_last_occurrence(self.output, '"', True)
start = len(self.output)
self.parse_string()
# repair: remove the start quote of the second string
self.output = remove_at_index(self.output, start, 1)
return processed
def parse_number(self) -> bool:
"""Parse a number like 2.4 or 2.4e6"""
if not self.is_end_of_document():
start = self.i
if self.char() == "-":
self.inc()
err = self.expect_digit(start)
if err:
raise err
if not self.is_end_of_document() and self.char() == "0":
self.inc()
elif not self.is_end_of_document() and self.char() in "123456789":
self.inc()
while not self.is_end_of_document() and self.char().isdigit():
self.inc()
if not self.is_end_of_document() and self.char() == ".":
self.inc()
err = self.expect_digit(start)
if err:
raise err
while not self.is_end_of_document() and self.char().isdigit():
self.inc()
if not self.is_end_of_document() and self.char() in "eE":
self.inc()
if not self.is_end_of_document() and self.char() in "+-":
self.inc()
err = self.expect_digit(start)
if err:
raise err
while not self.is_end_of_document() and self.char().isdigit():
self.inc()
if self.i > start:
self.output += self.text[start : self.i]
return True
return False
def parse_keywords(self) -> bool:
"""Parse keywords true, false, null
Repair Python keywords True, False, None
"""
return (
self.parse_keyword("true", "true")
or self.parse_keyword("false", "false")
or self.parse_keyword("null", "null")
# repair Python keywords True, False, None
or self.parse_keyword("True", "true")
or self.parse_keyword("False", "false")
or self.parse_keyword("None", "null")
)
def parse_keyword(self, name: str, value: str) -> bool:
if self.text[self.i : self.i + len(name)] == name:
self.output += value
self.inc(by=len(name))
return True
return False
def parse_unquoted_string(self) -> bool:
"""Repair and unquoted string by adding quotes around it
Repair a MongoDB function call like NumberLong("2")
Repair a JSONP function call like callback({...});
"""
# note that the symbol can end with whitespaces: we stop at the next delimiter
start = self.i
while not self.is_end_of_document() and not is_delimiter(self.char()):
self.inc()
if self.i > start:
if not self.is_end_of_document() and self.char() == "(":
# repair a MongoDB function call like NumberLong("2")
# repair a JSONP function call like callback({...});
self.inc()
self.parse_value()
if not self.is_end_of_document() and self.char() == ")":
# repair: skip close bracket of function call
self.inc()
if not self.is_end_of_document() and self.char() == ";":
# repair: skip semicolon after JSONP call
self.inc()
return True
# else repair unquoted string
# first, go back to prevent getting trailing whitespaces in the string
while not self.is_start_of_document() and is_whitespace(self.char(pos=-1)):
self.dec()
symbol = self.text[start : self.i]
self.output += json.dumps(symbol)
return True
return False
def expect_digit(self, start: int) -> Optional[JsonRepairError]:
if self.is_end_of_document() or not self.char().isdigit():
num_so_far = self.text[start : self.i]
return JsonRepairError(
f"Invalid number '{num_so_far}', expecting a digit {self.got()}", 2
)
def invalid_character(self, char: str) -> JsonRepairError:
return JsonRepairError("Invalid character " + json.dumps(char), self.i)
def unexpected_character(self) -> JsonRepairError:
return JsonRepairError(
"Unexpected character " + json.dumps(self.text[self.i]), self.i
)
def unexpected_end(self) -> JsonRepairError:
return JsonRepairError("Unexpected end of json string", len(self.text))
def object_key_expected(self) -> JsonRepairError:
return JsonRepairError("Object key expected", self.i)
def object_value_expected(self) -> JsonRepairError:
return JsonRepairError("Object value expected", self.i)
def colon_expected(self) -> JsonRepairError:
return JsonRepairError("Colon expected", self.i)
def invalid_unicode_character(self, start: int) -> JsonRepairError:
end = start + 2
while re.match(r"\w", self.text[end]):
end += 1
chars = self.text[start:end]
return JsonRepairError(f'Invalid unicode character "{chars}"', self.i)
def got(self) -> str:
return (
f"but got '{self.char()}'"
if not self.is_end_of_document()
else "but reached end of input"
)