Uploaded image for project: 'MariaDB MaxScale'
  1. MariaDB MaxScale
  2. MXS-4852

KafkaCDC: Option to trigger only on specific attributes in a table record.

Details

    • New Feature
    • Status: Closed (View Workflow)
    • Minor
    • Resolution: Won't Do
    • 23.02.3
    • N/A
    • kafkacdc
    • RHEL v8.2
      VMware vCenter v7
      MariaDB v10.7 (3 node Galera cluster)
      Maxcsale v23.02.3 (one instance of each per MariaDB VM for redundancy)

    Description

      Hello,

      Would it be possible to have the option to trigger only on changes to selected attributes of a record instead of changes to any attribute within a record as is currently?

      Thanks.

      Attachments

        Activity

          markus makela markus makela added a comment -

          I'd imagine that this sort of filtering is easier to implement in Kafka itself by processing the stream using Kafka Streams.

          markus makela markus makela added a comment - I'd imagine that this sort of filtering is easier to implement in Kafka itself by processing the stream using Kafka Streams.
          markus makela markus makela added a comment -

          Something similar to what's done in the luafilter could be done for filtering the kafkacdc stream. A set of lua functions could define whether the filtered records should be produced or not.

          Here's some (untested) prototype code that could serve as a starting point:

          // This is called by the Lua code
          int add_record_filter(lua_State *L)
          {
              LuaData* data = static_cast<LuaData*>(lua_touserdata(state, lua_upvalueindex(1)));
           
              //
              // Check that the arguments are correct...
              //
           
              size_t len = 0;
              const char* ptr;
              ptr = lua_tolstring(L, 2, &len);
              std::string database(ptr, len);
              ptr = lua_tolstring(L, 3, &len);
              std::string table(ptr, len);
              ptr = lua_tolstring(L, 4, &len);
              std::string column(ptr, len);
           
              // The last value on the stack is the callback function
              int cb = luaL_ref(state, LUA_REGISTRYINDEX);
              data->filters[database][table][column] = cb;
           
              return 0;
          }
           
          // This is called by the kafkacdc code for each column in a row
          bool should_keep_record(lua_State *L, LuaData* data, const std::string& db, const std::string& tbl, const std::string& col, const std::string& val)
          {
              bool keep = true;
           
              if (int ref = data->filters[db][tbl][col])
              {
                  lua_rawgeti(L, LUA_REGISTRYINDEX, ref);
                  lua_pushlstring(L, val.c_str(), val.size());
           
                  if (lua_pcall(L, 1, 1) == 0)
                  {
                      keep = lua_toboolean(L, -1);
                  }
              }
           
              return keep;
          }
          

          markus makela markus makela added a comment - Something similar to what's done in the luafilter could be done for filtering the kafkacdc stream. A set of lua functions could define whether the filtered records should be produced or not. Here's some (untested) prototype code that could serve as a starting point: // This is called by the Lua code int add_record_filter(lua_State *L) { LuaData* data = static_cast <LuaData*>(lua_touserdata(state, lua_upvalueindex(1)));   // // Check that the arguments are correct... //   size_t len = 0; const char * ptr; ptr = lua_tolstring(L, 2, &len); std::string database(ptr, len); ptr = lua_tolstring(L, 3, &len); std::string table(ptr, len); ptr = lua_tolstring(L, 4, &len); std::string column(ptr, len);   // The last value on the stack is the callback function int cb = luaL_ref(state, LUA_REGISTRYINDEX); data->filters[database][table][column] = cb;   return 0; }   // This is called by the kafkacdc code for each column in a row bool should_keep_record(lua_State *L, LuaData* data, const std::string& db, const std::string& tbl, const std::string& col, const std::string& val) { bool keep = true ;   if ( int ref = data->filters[db][tbl][col]) { lua_rawgeti(L, LUA_REGISTRYINDEX, ref ); lua_pushlstring(L, val.c_str(), val.size());   if (lua_pcall(L, 1, 1) == 0) { keep = lua_toboolean(L, -1); } }   return keep; }

          People

            JoeCotellese Joe Cotellese (Inactive)
            Presnickety Presnickety
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Git Integration

                Error rendering 'com.xiplink.jira.git.jira_git_plugin:git-issue-webpanel'. Please contact your Jira administrators.