Skip to content

Commit

Permalink
Perform custom deserializations
Browse files Browse the repository at this point in the history
Items with these values will start appearing in a later version of kombu (5.3.0 onwards).

To migrate our kombu installation safely, we first add support for deserializing these objects, deploy that, and then we can migrate kombu to version 5.3.0 safely.
  • Loading branch information
jbkkd committed Oct 5, 2023
1 parent 444aa42 commit ea48a80
Showing 1 changed file with 63 additions and 11 deletions.
74 changes: 63 additions & 11 deletions kombu/utils/json.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""JSON Serialization Utilities."""

import base64
import datetime
import decimal
from decimal import Decimal
import json as stdjson
import uuid
from typing import Any, Callable, TypeVar


try:
from django.utils.functional import Promise as DjangoPromise
Expand Down Expand Up @@ -69,7 +72,19 @@ def dumps(s, _dumps=json.dumps, cls=None, default_kwargs=None, **kwargs):
**dict(default_kwargs, **kwargs))


def loads(s, _loads=json.loads, decode_bytes=True):
def object_hook(o: dict):
"""Hook function to perform custom deserialization."""
if o.keys() == {"__type__", "__value__"}:
decoder = _decoders.get(o["__type__"])
if decoder:
return decoder(o["__value__"])
else:
raise ValueError("Unsupported type", type, o)
else:
return o


def loads(s, _loads=json.loads, decode_bytes=True, object_hook=object_hook):
"""Deserialize json from string."""
# None of the json implementations supports decoding from
# a buffer/memoryview, or even reading from a stream
Expand All @@ -78,14 +93,51 @@ def loads(s, _loads=json.loads, decode_bytes=True):
# over. Note that pickle does support buffer/memoryview
# </rant>
if isinstance(s, memoryview):
s = s.tobytes().decode('utf-8')
s = s.tobytes().decode("utf-8")
elif isinstance(s, bytearray):
s = s.decode('utf-8')
s = s.decode("utf-8")
elif decode_bytes and isinstance(s, bytes):
s = s.decode('utf-8')

try:
return _loads(s)
except _DecodeError:
# catch "Unpaired high surrogate" error
return stdjson.loads(s)
s = s.decode("utf-8")

return _loads(s, object_hook=object_hook)


DecoderT = EncoderT = Callable[[Any], Any]
T = TypeVar("T")
EncodedT = TypeVar("EncodedT")


def register_type(
t: type[T],
marker: str,
encoder: Callable[[T], EncodedT],
decoder: Callable[[EncodedT], T],
):
"""Add support for serializing/deserializing native python type."""
_encoders[t] = (marker, encoder)
_decoders[marker] = decoder


_encoders: dict[type, tuple[str, EncoderT]] = {}
_decoders: dict[str, DecoderT] = {
"bytes": lambda o: o.encode("utf-8"),
"base64": lambda o: base64.b64decode(o.encode("utf-8")),
}

# NOTE: datetime should be registered before date,
# because datetime is also instance of date.
register_type(datetime, "datetime", datetime.datetime.isoformat, datetime.datetime.fromisoformat)
register_type(
datetime.date,
"date",
lambda o: o.isoformat(),
lambda o: datetime.datetime.fromisoformat(o).date(),
)
register_type(datetime.time, "time", lambda o: o.isoformat(), datetime.time.fromisoformat)
register_type(Decimal, "decimal", str, Decimal)
register_type(
uuid.UUID,
"uuid",
lambda o: {"hex": o.hex},
lambda o: uuid.UUID(**o),
)

0 comments on commit ea48a80

Please sign in to comment.