// // Copyright 2019 SenX S.A.S. // // This program is free software: you can redistribute it and/or modify it // under the terms of the GNU Affero General Public License as published // by the Free Software Foundation, version 3. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see // 300 s MACROTTL // Write token 'wtoken' MACROCONFIG 'WTOKEN' STORE // Number of GTS after which we should flush 'threshold' MACROCONFIG TOLONG 'THRESHOLD' STORE <% [] 'gts' CSTORE // // If there were no messages, flush the existing GTS // DUP ISNULL <% $gts !$WTOKEN UPDATE 'gts' FORGET STOP %> IFT // // Map for storing the chunks by message id // {} 'chunks' CSTORE // // Iterate over the messages // <% DROP // Extract the elements LIST-> DROP [ 'ip' 'port' 'msg' ] STORE // Extract the magic number $msg 0 2 SUBSTRING ->HEX 'magic' STORE // Gzipped message <% $magic '1e0f' == %> <% // Extract the message id $msg 2 8 SUBSTRING ->HEX 'msgid' STORE // Extract the chunk count $msg 11 1 SUBSTRING ->HEX FROMHEX 'chunkcount' STORE // Extract the chunk id $msg 10 1 SUBSTRING ->HEX FROMHEX 'chunkid' STORE // Extract the chunk list for the message id $chunks $msgid GET 'msgchunks' STORE // If the chunk list does not exist, create it with the current timestamp and chunk count as first two elements $msgchunks ISNULL <% [] NOW +! $chunkcount +! 'msgchunks' STORE $chunks $msgchunks $msgid PUT DROP %> IFT // Add the chunk to the list, keeping the chunkid so we can sort the chunks $msgchunks $msg 10 SUBSTRING +! DROP // If all the chunks have arrived, rebuild the message $msgchunks SIZE $chunkcount 2 + == <% // Remove the timestamp and chunk count, convert the data to hex and // sort the list so we get the chunks in the correct order $msgchunks [ 2 $chunkcount 1 + ] SUBLIST <% DROP ->HEX %> LMAP LSORT // Now remove the chunk id and chunk count from each hex string <% DROP 4 SUBSTRING %> LMAP // Now join the strings '' JOIN // Extract the magic of the merged chunks and update magic and msg DUP 0 4 SUBSTRING 'magic' STORE HEX-> 'msg' STORE // Clear the chunks from the 'chunks' map $chunks $msgid REMOVE DROP DROP %> <% // We don't have all the chunks, clear 'msg' so we end processing NULL 'msg' STORE %> IFTE %> IFT <% $magic '1f8b' == %> <% // Decompress gzipped message $msg UNGZIP 'msg' STORE %> // Gzipped GELF message <% $magic '7801' == %> <% // Decompress inflated message $msg INFLATE 'msg' STORE %> // ZLib compressed GELF message <% $magic '7b22' == %> <% // Do nothing for uncompressed messages %> // Uncompressed GELF message <% %> // Default action 3 // Number of cases SWITCH // Output MSG $msg %> LMAP // Now handle the messages we just decoded $gts SWAP <% DUP 'msg' STORE ISNULL NOT <% NEWGTS 'logs' RENAME // Extract elements which will be used as labels $msg 'UTF-8' BYTES-> JSON-> 'json' STORE { 'host' $json 'host' GET 'level' $json 'level' GET TOSTRING } RELABEL // Convert timestamp to platform's time units. // TODO(hbs): add sequence number to ensure we do not have collisions? $json 'timestamp' GET 1 s * TOLONG NaN NaN NaN $msg GZIP ->OPB64 ADDVALUE +! %> IFT %> FOREACH // If we have more than THRESHOLD GTS, flush them to Warp 10 and clear the 'gts' list SIZE !$THRESHOLD >= <% $gts !$WTOKEN UPDATE 'gts' FORGET %> IFT %>