Something similar to what's done in the luafilter could be done for filtering the kafkacdc stream. A set of lua functions could define whether the filtered records should be produced or not.
Here's some (untested) prototype code that could serve as a starting point:
// This is called by the Lua code
|
int add_record_filter(lua_State *L)
|
{
|
LuaData* data = static_cast<LuaData*>(lua_touserdata(state, lua_upvalueindex(1)));
|
|
//
|
// Check that the arguments are correct...
|
//
|
|
size_t len = 0;
|
const char* ptr;
|
ptr = lua_tolstring(L, 2, &len);
|
std::string database(ptr, len);
|
ptr = lua_tolstring(L, 3, &len);
|
std::string table(ptr, len);
|
ptr = lua_tolstring(L, 4, &len);
|
std::string column(ptr, len);
|
|
// The last value on the stack is the callback function
|
int cb = luaL_ref(state, LUA_REGISTRYINDEX);
|
data->filters[database][table][column] = cb;
|
|
return 0;
|
}
|
|
// This is called by the kafkacdc code for each column in a row
|
bool should_keep_record(lua_State *L, LuaData* data, const std::string& db, const std::string& tbl, const std::string& col, const std::string& val)
|
{
|
bool keep = true;
|
|
if (int ref = data->filters[db][tbl][col])
|
{
|
lua_rawgeti(L, LUA_REGISTRYINDEX, ref);
|
lua_pushlstring(L, val.c_str(), val.size());
|
|
if (lua_pcall(L, 1, 1) == 0)
|
{
|
keep = lua_toboolean(L, -1);
|
}
|
}
|
|
return keep;
|
}
|
I'd imagine that this sort of filtering is easier to implement in Kafka itself by processing the stream using Kafka Streams.