Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 129 additions & 30 deletions cassandra/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
from cassandra.protocol import _UNSET_VALUE
from cassandra.util import OrderedDict, _sanitize_identifiers

try:
from cassandra.serializers import make_serializers as _cython_make_serializers
_HAVE_CYTHON_SERIALIZERS = True
except ImportError:
_HAVE_CYTHON_SERIALIZERS = False

import logging
log = logging.getLogger(__name__)

Expand Down Expand Up @@ -474,6 +480,32 @@ def __init__(self, column_metadata, query_id, routing_key_indexes, query,
self.is_idempotent = False
self._is_lwt = is_lwt

@property
def _serializers(self):
"""Lazily create and cache Cython serializers for column types.

Returns a list of Serializer objects if Cython serializers are available
and there is no column encryption policy, otherwise returns None.

The column_encryption_policy check is performed on every access (not
cached) so that serializers are correctly bypassed if a policy is set
after construction. This means the cache never goes stale: once a CE
policy is present, we always return None and fall through to the
encryption-aware bind path.
"""
if self.column_encryption_policy:
return None
try:
return self._cached_serializers
except AttributeError:
pass
if _HAVE_CYTHON_SERIALIZERS and self.column_metadata:
self._cached_serializers = _cython_make_serializers(
[col.type for col in self.column_metadata])
else:
self._cached_serializers = None
return self._cached_serializers

@classmethod
def from_message(cls, query_id, column_metadata, pk_indexes, cluster_metadata,
query, prepared_keyspace, protocol_version, result_metadata,
Expand Down Expand Up @@ -532,6 +564,26 @@ def __str__(self):
__repr__ = __str__


def _raise_bind_serialize_error(col_spec, value, exc):
"""Wrap TypeError, struct.error, or OverflowError with column context.

Called from all three bind loop paths (CE, Cython, plain Python) to
provide a uniform error message that includes the column name and
expected type. struct.error arises from int32 out-of-range values;
OverflowError from float out-of-range values. Other exception types
(e.g. ValueError from VectorType dimension mismatch) propagate
without wrapping.
"""
actual_type = type(value)
if isinstance(exc, (OverflowError, struct.error)):
reason = 'value out of range'
else:
reason = 'invalid type'
message = ('Received an argument with %s for column "%s". '
'Expected: %s, Got: %s; (%s)' % (reason, col_spec.name, col_spec.type, actual_type, exc))
raise TypeError(message) from exc


class BoundStatement(Statement):
"""
A prepared statement that has been bound to a particular set of values.
Expand Down Expand Up @@ -635,44 +687,91 @@ def bind(self, values):
(value_len, len(self.prepared_statement.routing_key_indexes)))

self.raw_values = values
self.values = []
for value, col_spec in zip(values, col_meta):
if value is None:
self.values.append(None)
elif value is UNSET_VALUE:
if proto_version >= 4:
self._append_unset_value()
# Pre-allocate to avoid repeated list growth reallocations
self.values = [None] * col_meta_len
idx = 0
if ce_policy:
# Column encryption path: check each column for CE policy
for value, col_spec in zip(values, col_meta):
if value is None:
self.values[idx] = None
elif value is UNSET_VALUE:
if proto_version >= 4:
idx = self._append_unset_value(idx)
continue
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
try:
col_desc = ColDesc(col_spec.keyspace_name, col_spec.table_name, col_spec.name)
uses_ce = ce_policy.contains_column(col_desc)
if uses_ce:
col_type = ce_policy.column_type(col_desc)
col_bytes = col_type.serialize(value, proto_version)
col_bytes = ce_policy.encrypt(col_desc, col_bytes)
else:
col_bytes = col_spec.type.serialize(value, proto_version)
self.values[idx] = col_bytes
# struct.error: int32 out-of-range; OverflowError: float out-of-range
except (TypeError, struct.error, OverflowError) as exc:
_raise_bind_serialize_error(col_spec, value, exc)
idx += 1
else:
# Fast path: no column encryption, use Cython serializers if available
serializers = self.prepared_statement._serializers
if serializers is not None:
for ser, value, col_spec in zip(serializers, values, col_meta):
if value is None:
self.values[idx] = None
elif value is UNSET_VALUE:
if proto_version >= 4:
idx = self._append_unset_value(idx)
continue
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
else:
try:
col_bytes = ser.serialize(value, proto_version)
self.values[idx] = col_bytes
# struct.error: int32 out-of-range; OverflowError: float out-of-range
except (TypeError, struct.error, OverflowError) as exc:
_raise_bind_serialize_error(col_spec, value, exc)
idx += 1
else:
try:
col_desc = ColDesc(col_spec.keyspace_name, col_spec.table_name, col_spec.name)
uses_ce = ce_policy and ce_policy.contains_column(col_desc)
col_type = ce_policy.column_type(col_desc) if uses_ce else col_spec.type
col_bytes = col_type.serialize(value, proto_version)
if uses_ce:
col_bytes = ce_policy.encrypt(col_desc, col_bytes)
self.values.append(col_bytes)
except (TypeError, struct.error) as exc:
actual_type = type(value)
message = ('Received an argument of invalid type for column "%s". '
'Expected: %s, Got: %s; (%s)' % (col_spec.name, col_spec.type, actual_type, exc))
raise TypeError(message)
for value, col_spec in zip(values, col_meta):
if value is None:
self.values[idx] = None
elif value is UNSET_VALUE:
if proto_version >= 4:
idx = self._append_unset_value(idx)
continue
else:
raise ValueError("Attempt to bind UNSET_VALUE while using unsuitable protocol version (%d < 4)" % proto_version)
else:
try:
col_bytes = col_spec.type.serialize(value, proto_version)
self.values[idx] = col_bytes
# struct.error: int32 out-of-range; OverflowError: float out-of-range
except (TypeError, struct.error, OverflowError) as exc:
_raise_bind_serialize_error(col_spec, value, exc)
idx += 1

if proto_version >= 4:
diff = col_meta_len - len(self.values)
if diff:
for _ in range(diff):
self._append_unset_value()
# Fill remaining unbound columns with UNSET_VALUE (v4+ feature).
while idx < col_meta_len:
idx = self._append_unset_value(idx)
elif idx < col_meta_len:
# Pre-v4: trim trailing unused slots (no UNSET_VALUE support)
self.values = self.values[:idx]

return self

def _append_unset_value(self):
next_index = len(self.values)
if self.prepared_statement.is_routing_key_index(next_index):
col_meta = self.prepared_statement.column_metadata[next_index]
def _append_unset_value(self, idx):
if self.prepared_statement.is_routing_key_index(idx):
col_meta = self.prepared_statement.column_metadata[idx]
raise ValueError("Cannot bind UNSET_VALUE as a part of the routing key '%s'" % col_meta.name)
self.values.append(UNSET_VALUE)
self.values[idx] = UNSET_VALUE
return idx + 1

@property
def routing_key(self):
Expand Down
20 changes: 20 additions & 0 deletions cassandra/serializers.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright ScyllaDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


cdef class Serializer:
# The cqltypes._CassandraType corresponding to this serializer
cdef object cqltype

cpdef bytes serialize(self, object value, int protocol_version)
Loading
Loading