Edit this page on github
Frequently asked questions on comdb2
Lua Triggers
The server can be set up to run a stored procedure when records are inserted, updated or deleted from a table. The stored procedure is called once per row that is changed. This is true even if the transaction changes several rows in one commit. The stored procedure receives one argument (an event) - it is a Lua table which contains data describing the changes to the row:
id = unique id associated with this event
name = table name which triggered event
type = add|del|upd
new = nil|{Lua-table with values which were updated/inserted}
old = nil|{Lua-table with values which were updated/deleted}
The trigger procedure can obtain the transaction-id for a given event by
passing it to db:get_event_tid()
.
The CREATE LUA TRIGGER statement creates the named-trigger when rows are inserted, updated and (or) deleted from the table.
Let us see a few examples. Consider a table t
with columns i
, j
, k
,
l
. We'd like to run stored procedure audit
which logs changes to t
into
table audit
. The stored procedure would like to log values of i, j when a new
row is inserted, log values of j, k when rows are updated and log values of k,
l when rows are deleted.
CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT OF i, j AND UPDATE OF j, k AND DELETE OF k, l)
If we wanted to log all columns when a row is inserted, the statement would look like:
CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT OF i, j, k, l)
If we skip column names, then all columns at the time of creating trigger are
considered. Keep in mind that these definitions are not updated automatically
when table t
is altered. Previous example could be written as:
CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT)
Statement to set up trigger on insert into multiple tables, say t1
and t2
would look like:
CREATE LUA TRIGGER audit ON (TABLE t1 FOR INSERT), (TABLE t2 FOR INSERT)
Behind the scenes, the database will set up a queue to save requested changes
to specified tables. The database will then select one node in the cluster to
start listening on this queue. When there is an item on the queue, the database
reads the item, starts a new transaction, creates a corresponding Lua table and
executes the stored procedure. When stored procedure completes successfully,
the database consumes this item from the queue and commits the transaction. The
database then goes back to listening for subsequent items on the queue.
db:begin()
and db:commit()
calls are not available to triggers as the
database starts one implicitly so all of stored procedure actions and
queue-consume operation are guaranteed to commit atomically.
The stored procedure should return 0 on success. Not returning a value or any other non-zero value will cause the stored procedure transaction to rollback. This implies that if trigger stored procedure transaction has any failure (ex. a failed insert, a duplicate error, etc.), that transaction will abort and the event will not be removed from the queue, rather the event will be reprocessed: i.e. stored procedure will be rerun (and if it fails, it will be retried, possibly indefinitely). Only if the stored procedure actions complete successfully, will the event be consumed from the queue.
Following is an example for a trigger audit
which logs all changes to table
t
. Table has two int fields: i
, j
. Stored procedure logs data to table
audit
, storing type of change to t
, time of log and the changed values.
cdb2sql testdb default - <<'EOF'
CREATE TABLE t(i int, j int)$$
CREATE TABLE audit(type cstring(4), tbl cstring(64), logtime datetime, i int, j int, old_i int, old_j int)$$
CREATE PROCEDURE audit VERSION 'sample' {
local function main(event)
local audit = db:table('audit')
local chg
if event.new ~= nil then
chg = event.new
end
if chg == nil then
chg = {}
end
if event.old ~= nil then
for k, v in pairs(event.old) do
chg['old_'..k] = v
end
end
chg.type = event.type
chg.tbl = event.name
chg.logtime = db:now()
return audit:insert(chg)
end
}$$
CREATE LUA TRIGGER audit ON (TABLE t FOR INSERT AND UPDATE AND DELETE)
INSERT INTO t VALUES(1,1),(1,2),(1,3),(1,4)
UPDATE t SET i = j WHERE j % 2 = 0
DELETE FROM t WHERE i % 2 <> 0
EOF
This should generate 8 events and our sample stored procedure should have
logged them in audit
.
cdb2sql testdb default "select * from audit order by logtime"
(type='add', tbl='t', logtime="2019-12-19T131422.330 America/New_York", i=1, j=1, old_i=NULL, old_j=NULL)
(type='add', tbl='t', logtime="2019-12-19T131422.331 America/New_York", i=1, j=2, old_i=NULL, old_j=NULL)
(type='add', tbl='t', logtime="2019-12-19T131422.332 America/New_York", i=1, j=3, old_i=NULL, old_j=NULL)
(type='add', tbl='t', logtime="2019-12-19T131422.332 America/New_York", i=1, j=4, old_i=NULL, old_j=NULL)
(type='upd', tbl='t', logtime="2019-12-19T131422.333 America/New_York", i=2, j=2, old_i=1, old_j=2)
(type='upd', tbl='t', logtime="2019-12-19T131422.334 America/New_York", i=4, j=4, old_i=1, old_j=4)
(type='del', tbl='t', logtime="2019-12-19T131422.334 America/New_York", i=NULL, j=NULL, old_i=1, old_j=1)
(type='del', tbl='t', logtime="2019-12-19T131422.335 America/New_York", i=NULL, j=NULL, old_i=1, old_j=3)
The DROP LUA TRIGGER
statement will stop execution of the store procedure and
remove the queue associated with the trigger. To delete our example trigger, we
will run the following statement:
DROP LUA TRIGGER audit
Lua Consumers
The CREATE LUA CONSUMER
statement creates the
named-consumer for INSERT/UPDATE/DELETE of specified fields from the specified
table. A Lua consumer is similar in mechanics to a Lua trigger. Instead of the
database running the stored procedure automatically, this requires a client
program to run 'EXEC PROCEDURE sp-name()'. Additionally, there is a mechanism
to send back data to the calling program and block until client signals that
data is consumed. This is different from regular server-client protocol which
streams rows and blocks only when socket is full.
Let us study a sample consumer. The client program would like to receive all
changes to table t
. The example uses cdb2sql
, however a real application
would use the API and make and the usual calls (cdb2_run_statement
,
cdb2_next_record
) to run the stored procedure and obtain rows. Buffering in
cdb2sql
may cause the rows to not appear immediately.
cdb2sql testdb default - <<'EOF'
CREATE TABLE t(i int, d double, c cstring(10), b byte(4), t datetime, y intervalym)$$
CREATE PROCEDURE watch VERSION 'sample' {
local function define_emit_columns()
local num = db:exec("select count(*) as num from comdb2_columns where tablename='t'"):fetch().num
--temporary kludge: convert to Lua number
num = tonumber(tostring(num))
local total = 2 + num * 2 -- type, tablename, new columns, old columns
db:num_columns(total)
local i = 0
local cols = {}
i = i + 1
table.insert(cols, i, {n='tbl', t='cstring'})
i = i + 1
table.insert(cols, i, {n='type', t='cstring'})
local stmt = db:exec("select columnname as name, type as type from comdb2_columns where tablename='t'")
local r = stmt:fetch()
while r do
local n = tostring(r.name)
local t = tostring(r.type)
i = i + 1
table.insert(cols, i, {n = n, t= t})
table.insert(cols, i + num, {n = 'old_'..n, t = t})
r = stmt:fetch()
end
for k, v in ipairs(cols) do
db:column_name(v.n, k)
db:column_type(v.t, k)
end
end
local function main()
define_emit_columns()
-- get handle to consumer associated with stored procedure
local consumer = db:consumer()
while true do
local change = consumer:get() -- blocking call
local row
if change.new ~= nil then
row = change.new
end
if row == nil then
row = {}
end
if change.old ~= nil then
for k, v in pairs(change.old) do
row['old_'..k] = v
end
end
row.tbl = change.name
row.type = change.type
consumer:emit(row) -- blocking call
consumer:consume()
end
end
}$$
CREATE LUA CONSUMER watch ON (TABLE t FOR INSERT AND UPDATE AND DELETE)
INSERT INTO t VALUES(1, 22.0/7, 'hello', x'deadbeef', now(), 5)
UPDATE t set i = i + d, d = d + i, b = x'600dc0de'
DELETE from t
EOF
A client which executes the watch
procedure will receive the following rows:
cdb2sql testdb default "EXEC PROCEDURE watch()"
(tbl='t', type='add', i=1, d=3.142857, c='hello', b=x'deadbeef', t="2019-12-19T200838.418 ", y="0-5", old_i=NULL, old_d=NULL, old_c=NULL, old_b=NULL, old_t=NULL, old_y=NULL)
(tbl='t', type='upd', i=4, d=4.142857, c='hello', b=x'600dc0de', t="2019-12-19T200838.418 ", y="0-5", old_i=1, old_d=3.142857, old_c='hello', old_b=x'deadbeef', old_t="2019-12-19T200838.418 ", old_y="0-5")
(tbl='t', type='del', i=NULL, d=NULL, c=NULL, b=NULL, t=NULL, y=NULL, old_i=4, old_d=4.142857, old_c='hello', old_b=x'600dc0de', old_t="2019-12-19T200838.418 ", old_y="0-5")
The system provides a unique id
for each event delivered to consumer. If
client successfully processed an event, but crashed before requesting next row,
the database will send the last event again. id
may be useful to detect such
a condition.
The dbconsumer:emit()
only returns when the client application requests the
next event by calling cdb2_next_record
. In the example above,
dbconsumer:consume()
starts a new transaction and consumes the last event.
The consumer may want to log the change (like the sample in Lua trigger), emit the event and consume event atomically. This can be accomplished by wrapping these operations in an explicit transaction as shown below:
local function main()
local audit = db:table("audit") -- from Lua trigger example
local consumer = db:consumer()
...
while true do
local change = consume:get()
-- massage change into audit and emit formats
...
db:begin()
audit:insert(lua_tbl_for_audit)
consumer:emit(lua_tbl_for_emit)
consumer:consume()
db:commit()
end
end
If several client application runs the same consumer stored procedure, the database will ensure that only one executes. The rest of the stored procedures will block in the call to obtain dbconsumer
handle (call to db:consumer()
will block). When the executing application disconnects, the server will pick any one of the outstanding clients to run next.
The DROP LUA CONSUMER
statement will stop execution of the store procedure and remove the queue associated with the consumer. To delete our example consumer, we will run the following statement:
DROP LUA CONSUMER watch
Consumer API
db:consumer
dbconsumer = db:consumer(x)
x: Optional Lua table to set timeout, etc
Description:
Returns a dbconsumer object associated with the stored procedure running a Lua consumer. This method blocks until registration succeeds with master node. It accepts an optional Lua table with following keys:
x.register_timeout = number (ms)
Specify timeout (in milliseconds) to wait for registration with master. For example, db:consumer({register_timeout = 5000})
will return nil
if registration fails after 5 seconds.
x.with_sequence = true | false (default)
When with_sequence
is true
, the Lua table returned by dbconsumer:get/poll()
includes an additional property (sequence
). If the trigger was created with the PERSISTENT_SEQUENCE
option enabled, then this sequence will be a monotonically increasing count of the items that have been enqueued.
x.with_epoch = true | false (default)
When with_epoch
is true
, the Lua table returned by dbconsumer:get/poll()
includes an additional property (epoch
) which contains the unix time-epoch (seconds since 00:00:00 January 1, 1970 UTC) at the time when this event was enqueued.
x.with_tid = true | false (default)
When with_tid
is true
, Lua table returned by dbconsumer:get/poll()
include additional property (tid
). This is the same tid
returned by db:get_event_tid()
db:get_event_tid
tid = db:get_event_tid(x)
x: Lua table returned by `dbconsumer:get/poll()` or argument passed to a `main` in Lua trigger.
Returns transaction-id (tid
) for a given event. All events belonging to same originating transaction will have the same tid
. This can be used by application to detect transaction boundaries as tid changes. tid
s are not unique like event's id
and will eventually recycle.
dbconsumer:get
lua-table = dbconsumer:get()
Description:
This method blocks until there an event available to consume. It returns a Lua table which contains data describing the event. It contains:
Key | Value |
---|---|
id | unique id associated with this event |
name | table name which triggered event |
type | "add" or "del" or "upd" |
new | nil or a Lua-table with values which were updated/inserted |
old | nil or Lua-table with values which were updated/deleted |
tid | optional transaction id (see with_tid above) |
dbconsumer:poll
lua-table = dbconsumer:poll(t)
t: number (ms)
Specify timeout (in milliseconds) to wait for event to be generated in the system. Similar to dbconsumer:get()
otherwise. Returns nil
if no event is avaiable after timeout.
dbconsumer:consume
Description:
Consumes the last event obtained by dbconsumer:get/poll()
. Creates a new transaction if no explicit transaction was ongoing.
dbconsumer:emit
Description:
Like db:emit()
, except it will block until the calling client requests next row by calling cdb2_next_record
.