Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 307 additions & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
#include "string_util.h"
#include "time_util.h"
#include "types/redis_hash.h"

Expand Down Expand Up @@ -481,6 +482,301 @@ class CommandHRandField : public Commander {
bool no_parameters_ = true;
};

auto ParseFieldsArgs = [](const std::vector<std::string> &args, size_t fields_keyword_index,
std::vector<Slice> &fields) -> Status {
if (!util::EqualICase(args[fields_keyword_index], "FIELDS")) {
return {Status::RedisParseErr, "mandatory argument FIELDS is missing or not in the right position"};
}

auto num_fields_result = ParseInt<int64_t>(args[fields_keyword_index + 1], 10);
if (!num_fields_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*num_fields_result <= 0) {
return {Status::RedisParseErr, "numfields must be a positive integer"};
}
auto num_fields = static_cast<size_t>(*num_fields_result);

// Check we have the right number of fields
if (args.size() != fields_keyword_index + 2 + num_fields) {
return {Status::RedisParseErr, "number of fields does not match numfields"};
}

for (size_t i = fields_keyword_index + 2; i < args.size(); i++) {
fields.emplace_back(args[i]);
}

return Status::OK();
};

class CommandHExpire : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// HEXPIRE key seconds FIELDS numfields field [field ...]
// HPEXPIRE key ms FIELDS numfields field [field ...]
// Minimum: HEXPIRE key seconds FIELDS 1 field = 6 args
if (args.size() < 6) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
auto input_time = ParseInt<int64_t>(args[2], 10);
if (!input_time) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*input_time < 0) {
return {Status::RedisParseErr, "invalid time, must be >= 0"};
}
if (util::EqualICase(args[0], "hexpire")) {
expire_ms_ = static_cast<uint64_t>(*input_time) * 1000 + util::GetTimeStampMS();
} else if (util::EqualICase(args[0], "hpexpire")) {
expire_ms_ = static_cast<uint64_t>(*input_time) + util::GetTimeStampMS();
} else if (util::EqualICase(args[0], "hexpireat")) {
expire_ms_ = static_cast<uint64_t>(*input_time) * 1000;
} else if (util::EqualICase(args[0], "hpexpireat")) {
expire_ms_ = static_cast<uint64_t>(*input_time);
}

if (util::EqualICase(args[3], "FIELDS")) {
GET_OR_RET(ParseFieldsArgs(args, 3, fields_));
return Commander::Parse(args);
}

if (util::EqualICase(args[3], "NX")) {
condition_ = FieldExpireCondition::kFieldExpireTimeNotExists;
} else if (util::EqualICase(args[3], "XX")) {
condition_ = FieldExpireCondition::kFieldExpireTimeExists;
} else if (util::EqualICase(args[3], "GT")) {
condition_ = FieldExpireCondition::kFieldExpireTimeGreaterThanInput;
} else if (util::EqualICase(args[3], "LT")) {
condition_ = FieldExpireCondition::kFieldExpireTimeLessThanInput;
} else {
return {Status::RedisParseErr, "expect argument FIELDS or [NX|XX|GT|LT] is missing or not in the right position"};
}

if (util::EqualICase(args[4], "FIELDS")) {
GET_OR_RET(ParseFieldsArgs(args, 4, fields_));
return Commander::Parse(args);
}
return {Status::RedisParseErr, "mandatory argument FIELDS is missing or not in the right position"};
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
std::vector<FieldExpireResult> results;

auto s = hash_db.ExpireFields(ctx, args_[1], expire_ms_, fields_, &results, condition_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

// Return array of results
std::vector<std::string> result_strings;
result_strings.reserve(results.size());
for (const auto &r : results) {
result_strings.emplace_back(redis::Integer(static_cast<int64_t>(r)));
}
*output = redis::Array(result_strings);
return Status::OK();
}

private:
uint64_t expire_ms_ = 0;
std::vector<Slice> fields_;
FieldExpireCondition condition_ = FieldExpireCondition::kFieldNoExpireCondition;
};

class CommandHPersist : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// HPERSIST key FIELDS numfields field [field ...]
if (args.size() < 5) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

GET_OR_RET(ParseFieldsArgs(args, 2, fields_));
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
std::vector<FieldPersistResult> results;

auto s = hash_db.PersistFields(ctx, args_[1], fields_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

std::vector<std::string> result_strings;
result_strings.reserve(results.size());
for (const auto &r : results) {
result_strings.emplace_back(redis::Integer(static_cast<int64_t>(r)));
}
*output = redis::Array(result_strings);
return Status::OK();
}

private:
std::vector<Slice> fields_;
};

class CommandHTTL : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// HTTL key FIELDS numfields field [field ...]
if (args.size() < 5) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

GET_OR_RET(ParseFieldsArgs(args, 2, fields_));
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
std::vector<int64_t> results;

auto s = hash_db.TTLFields(ctx, args_[1], fields_, &results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

auto current_time_ms = static_cast<int64_t>(util::GetTimeStampMS());

for (auto &r : results) {
if (r > 0) {
if (util::EqualICase(args_[0], "httl")) {
r = ((r - current_time_ms) / 1000);
} else if (util::EqualICase(args_[0], "hpttl")) {
r = (r - current_time_ms);
} else if (util::EqualICase(args_[0], "hexpiretime")) {
r = r / 1000;
} else if (util::EqualICase(args_[0], "hpexpiretime")) {
}
}
}
std::vector<std::string> result_strings;
result_strings.reserve(results.size());
for (const auto &r : results) {
result_strings.emplace_back(redis::Integer(r));
}
*output = redis::Array(result_strings);
return Status::OK();
}

private:
std::vector<Slice> fields_;
};

class CommandHMSetEX : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
// HSETEX key [FNX | FXX] [EX seconds | PX milliseconds |
// EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
// FIELDS numfields field value [field value ...]
if (args.size() < 7) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

auto parse_expire_value = [](const std::string &value_str) -> uint64_t {
auto result = ParseInt<uint64_t>(value_str, 10);
if (!result || *result <= 0) {
return 0;
}
return result.GetValue();
};

size_t pos = 2;
if (pos < args.size() && util::EqualICase(args[pos], std::string_view("FIELDS"))) {
return {Status::RedisParseErr, "ERR Missing expiration option"};
}

params_.expire_params.option = SetEXExpireOption::kNoExpire;
while (pos < args.size()) {
const auto &opt = args[pos];
if (util::EqualICase(opt, "KEEPTTL")) {
params_.expire_params.option = SetEXExpireOption::kKEEPTTL;
pos++;
} else if (util::EqualICase(opt, "FNX")) {
params_.condition = SetEXFieldCondition::kFNX;
pos++;
} else if (util::EqualICase(opt, "FXX")) {
params_.condition = SetEXFieldCondition::kFXX;
pos++;
} else if (util::EqualICase(opt, "FIELDS")) {
if (params_.expire_params.option == SetEXExpireOption::kNoExpire) {
return {Status::RedisParseErr, "Invalid syntax: at least one expiration option is required before FIELDS"};
} else {
break;
}
} else {
auto value = parse_expire_value(args[pos + 1]);
params_.expire_params.value = value;
if (value == 0) {
return {Status::RedisParseErr, "Invalid expire time value"};
}
if (util::EqualICase(opt, "EX")) {
params_.expire_params.option = SetEXExpireOption::kEX;
} else if (util::EqualICase(opt, "PX")) {
params_.expire_params.option = SetEXExpireOption::kPX;
} else if (util::EqualICase(opt, "EXAT")) {
params_.expire_params.option = SetEXExpireOption::kEXAT;
} else if (util::EqualICase(opt, "PXAT")) {
params_.expire_params.option = SetEXExpireOption::kPXAT;
} else {
return {Status::RedisParseErr, "Invalid syntax: expected EX, PX, EXAT, PXAT, KEEPTTL, FNX or FXX"};
}
pos += 2;
}
}
if (pos >= args.size() || !util::EqualICase(args[pos], "FIELDS")) {
return {Status::RedisParseErr, "mandatory argument FIELDS is missing"};
}
pos++;
if (pos >= args.size()) {
return {Status::RedisParseErr, "FIELDS requires numfields argument"};
}

auto num_fields_result = ParseInt<uint64_t>(args[pos], 10);
if (!num_fields_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (*num_fields_result <= 0) {
return {Status::RedisParseErr, "numfields must be a positive integer"};
}
auto num_fields = *num_fields_result;
pos++;

if (args.size() != pos + 2 * num_fields) {
return {Status::RedisParseErr, "number of field-value pairs does not match numfields"};
}

for (size_t i = 0; i < num_fields; i++) {
field_values_.emplace_back(args[pos], args[pos + 1]);
pos += 2;
}
return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
uint64_t ret = 0;
redis::Hash hash_db(srv->storage, conn->GetNamespace());

auto s = hash_db.MSetEx(ctx, args_[1], field_values_, params_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
if (ret == 0) {
*output = redis::Integer(0);
} else {
*output = redis::Integer(1);
}
return Status::OK();
}

private:
HSetExParams params_;
std::vector<FieldValue> field_values_;
};

REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
Expand All @@ -498,6 +794,16 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1), )
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only slow", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hpexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hpexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("hpttl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("hexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("hpexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHMSetEX>("hsetex", -7, "write", 1, 1, 1), )

} // namespace redis
43 changes: 43 additions & 0 deletions src/storage/compact_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "storage/redis_metadata.h"
#include "time_util.h"
#include "types/redis_bitmap.h"
#include "types/redis_hash.h"
#include "types/redis_timeseries.h"

namespace engine {
Expand Down Expand Up @@ -152,6 +153,48 @@ bool SubKeyFilter::Filter([[maybe_unused]] int level, const Slice &key, const Sl
return false;
}
return expired;
} else if (metadata.Type() == kRedisHash) {
auto md_expired = IsMetadataExpired(ikey, metadata);
if (md_expired) {
return true;
}

auto db = stor_->GetDB();

// Hash field subkeys come in pairs:
// 1. Field value key with version = metadata.version
// 2. Field expire key with version = metadata.version + ExpireVersionOffset (8)
// They are processed independently in compaction, so we must handle both in Filter to ensure consistency.
// To avoid orphaning, we check expire time when processing key, but compaction filter handle key one by one, so we
// have to left the orphaned key exists here.
if (ikey.GetVersion() == metadata.version) { // when there is an hash field
std::string ns_key = ComposeNamespaceKey(ikey.GetNamespace(), ikey.GetKey(), stor_->IsSlotIdEncoded());
InternalKey expire_ikey(ns_key, ikey.GetSubKey(), metadata.version + ExpireVersionOffset,
stor_->IsSlotIdEncoded());
std::string expire_key = expire_ikey.Encode();
std::string expire_value;
auto expire_status =
db->Get(rocksdb::ReadOptions(), stor_->GetCFHandle(ColumnFamilyID::PrimarySubkey), expire_key, &expire_value);
if (expire_status.ok() && expire_value.size() >= sizeof(uint64_t)) {
uint64_t expire_at = DecodeFixed64(expire_value.data());
uint64_t now = util::GetTimeStampMS();
if (expire_at < now) {
return true;
}
}
return false;
} else if (ikey.GetVersion() == metadata.version + ExpireVersionOffset) { // when there is an hash field expire key
std::string ns_key = ComposeNamespaceKey(ikey.GetNamespace(), ikey.GetKey(), stor_->IsSlotIdEncoded());
InternalKey origin_ikey(ns_key, ikey.GetSubKey(), metadata.version, stor_->IsSlotIdEncoded());
std::string origin_key = origin_ikey.Encode();
std::string origin_value;
auto origin_status =
db->Get(rocksdb::ReadOptions(), stor_->GetCFHandle(ColumnFamilyID::PrimarySubkey), origin_key, &origin_value);
if (origin_status.IsNotFound()) {
return true;
}
}
return false;
}

return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
Expand Down
Loading