Skip to content

Commit

Permalink
Add atomic flag management with SetFlag and GetFlag shards
Browse files Browse the repository at this point in the history
Introduce AtomicFlag struct with atomic bool for flag state management. Implement `SetFlag` shard to set flag value based on input and `GetFlag` shard to retrieve the current flag state. Integrate shared flag access using getFlag function with thread-safe handling. Register new shards in channels module.
  • Loading branch information
sinkingsugar committed Oct 31, 2024
1 parent b822a6b commit c0db158
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
70 changes: 70 additions & 0 deletions shards/modules/channels/channels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,76 @@ struct Flush : public Base {
return input;
}
};

FlagPtr getFlag(const std::string &name) {
static std::unordered_map<std::string, std::weak_ptr<AtomicFlag>> flags;
static std::shared_mutex mutex;

std::shared_lock<decltype(mutex)> _l(mutex);
auto it = flags.find(name);
if (it == flags.end()) {
_l.unlock();
std::scoped_lock<decltype(mutex)> _l1(mutex);
auto sp = std::make_shared<AtomicFlag>();
flags[name] = sp;
return sp;
} else {
std::shared_ptr<AtomicFlag> sp = it->second.lock();
if (!sp) {
_l.unlock();
std::scoped_lock<decltype(mutex)> _l1(mutex);
sp = std::make_shared<AtomicFlag>();
flags[name] = sp;
}
return sp;
}
}

// Set flag value shard
struct SetFlag : public Base {
FlagPtr _flag;

static inline Parameters setFlagParams{
{"Name", SHCCSTR("The name of the flag."), {CoreInfo::StringType}},
};

static SHTypesInfo inputTypes() { return CoreInfo::BoolType; }
static SHTypesInfo outputTypes() { return CoreInfo::BoolType; }
static SHParametersInfo parameters() { return setFlagParams; }

SHTypeInfo compose(const SHInstanceData &data) {
_flag = getFlag(_name);
return data.inputType;
}

SHVar activate(SHContext *context, const SHVar &input) {
_flag->value.store(input.payload.boolValue);
return input;
}
};

// Get flag value shard
struct GetFlag : public Base {
FlagPtr _flag;

static inline Parameters getFlagParams{
{"Name", SHCCSTR("The name of the flag."), {CoreInfo::StringType}},
};

static SHTypesInfo inputTypes() { return CoreInfo::AnyType; }
static SHTypesInfo outputTypes() { return CoreInfo::BoolType; }
static SHParametersInfo parameters() { return getFlagParams; }

SHTypeInfo compose(const SHInstanceData &data) {
_flag = getFlag(_name);
return CoreInfo::BoolType;
}

SHVar activate(SHContext *context, const SHVar &input) { return Var(_flag->value.load()); }
};
} // namespace channels
} // namespace shards

SHARDS_REGISTER_FN(channels) {
using namespace shards::channels;
REGISTER_SHARD("Produce", Produce);
Expand All @@ -521,4 +589,6 @@ SHARDS_REGISTER_FN(channels) {
REGISTER_SHARD("Listen", Listen);
REGISTER_SHARD("Complete", Complete);
REGISTER_SHARD("Flush", Flush);
REGISTER_SHARD("SetFlag", SetFlag);
REGISTER_SHARD("GetFlag", GetFlag);
}
8 changes: 8 additions & 0 deletions shards/modules/channels/channels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ class BroadcastChannel : public ChannelShared {
using Channel = std::variant<DummyChannel, MPMCChannel, BroadcastChannel>;
std::shared_ptr<Channel> get(const std::string &name);

// Atomic flag wrapper
struct AtomicFlag {
std::atomic_bool value{false};
};

using FlagPtr = std::shared_ptr<AtomicFlag>;
FlagPtr getFlag(const std::string &name);

} // namespace channels
} // namespace shards

Expand Down
3 changes: 3 additions & 0 deletions shards/tests/hello.shs
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,6 @@ ToJson | Yaml.FromJson | Log

sorted-table-test | First | ExpectSeq | Assert.Is([["A" 1] 1])
sorted-table-test | Last | ExpectSeq | Assert.Is([["E"] 6])

true | SetFlag("test-flag")
GetFlag("test-flag") | Log | Assert.Is(true)

0 comments on commit c0db158

Please sign in to comment.