class Object
Constants
- CHUNKS
-
— table UDF ————————————————————–
- CHUNK_SIZE
- CHUNK_SLEEP_SEC
- IDS
-
Pre-build data so data generation is not part of the measured work.
- NAMES
- ROWS
-
— scalar UDF ————————————————————-
- ROWS_PER_CHUNK
- SLEEP_EVERY
- SLEEP_SEC
- TYPES
Public Instance Methods
Source
# File sample/issue1136.rb, line 65 def emit_chunk(output) ROWS_PER_CHUNK.times { |i| output.set_value(0, i, 1) } output.size = ROWS_PER_CHUNK sleep(CHUNK_SLEEP_SEC) end
Source
# File sample/issue1136.rb, line 71 def emitter_bind_block proc do |bind_info| bind_info.add_result_column('v', DuckDB::LogicalType::BIGINT) # Tell the planner there is real work so it distributes across workers. bind_info.set_cardinality(CHUNKS * ROWS_PER_CHUNK, false) end end
Source
# File sample/issue1136.rb, line 79 def emitter_execute_block(threads_seen) remaining = CHUNKS mutex = Mutex.new proc do |_info, output| threads_seen[Thread.current] = true has_work = mutex.synchronize { (remaining -= 1) >= 0 } has_work ? emit_chunk(output) : output.size = 0 end end
Source
# File sample/issue1136.rb, line 46 def measure_scalar(threads) db = DuckDB::Database.open con = db.connect con.execute("SET threads=#{threads}") con.execute("CREATE TABLE t AS SELECT range::INTEGER AS value FROM range(#{ROWS})") threads_seen = {} register_slow_triple(con, threads_seen) elapsed, sum = timed_sum(con, 'SELECT SUM(slow_triple(value)) FROM t') con.close db.close [elapsed, threads_seen.size, sum] end
Source
# File sample/issue1136.rb, line 99 def measure_table(threads) db = DuckDB::Database.open con = db.connect con.execute("SET threads=#{threads}") threads_seen = {} register_slow_emitter(con, threads_seen) elapsed, sum = timed_sum(con, 'SELECT SUM(v) FROM slow_emitter()') con.close db.close [elapsed, threads_seen.size, sum] end
Source
# File benchmark/appender_ips.rb, line 28 def new_table(con) con.query('DROP TABLE IF EXISTS t') con.query('CREATE TABLE t (id INTEGER, name VARCHAR)') end
Source
# File sample/issue922_benchmark.rb, line 120 def query_via_parquet(con, data_frame, name, parquet_path) data_frame.write_parquet(parquet_path) con.query("CREATE OR REPLACE TABLE #{name} AS SELECT * FROM read_parquet('#{parquet_path}')") con.query("SELECT * FROM #{name}").to_a end
Source
# File sample/issue930_benchmark.rb, line 35 def register_as_table_with_create_table(con, csv, name) headers = csv.first.headers csv.rewind con.execute("CREATE OR REPLACE TABLE #{name} (#{headers.map { |h| "#{h} VARCHAR" }.join(', ')})") csv.each do |row| values = row.map { |cell| "'#{cell[1]}'" }.join(', ') con.execute("INSERT INTO #{name} VALUES (#{values})") end end
Source
# File sample/issue1136.rb, line 89 def register_slow_emitter(con, threads_seen) tf = DuckDB::TableFunction.new tf.name = 'slow_emitter' tf.bind(&emitter_bind_block) # Without max_threads DuckDB assigns a single worker and the proxy never fires. tf.init { |init_info| init_info.max_threads = 4 } tf.execute(&emitter_execute_block(threads_seen)) con.register_table_function(tf) end
Source
# File sample/issue1136.rb, line 27 def register_slow_triple(con, threads_seen) sf = DuckDB::ScalarFunction.new sf.name = 'slow_triple' sf.add_parameter(DuckDB::LogicalType::INTEGER) sf.return_type = DuckDB::LogicalType::BIGINT sf.set_function do |v| threads_seen[Thread.current] = true sleep(SLEEP_SEC) if (v % SLEEP_EVERY).zero? v * 3 end con.register_scalar_function(sf) end
Source
# File sample/issue1136.rb, line 113 def report(label, expected) puts "#{label}:" [1, 4].each do |threads| elapsed, distinct, sum = yield(threads) raise "wrong result: #{sum} (expected #{expected})" unless sum == expected puts " SET threads=#{threads}: #{elapsed.round(3)}s, callbacks ran on #{distinct} distinct Ruby thread(s)" end end
— run both —————————————————————
Source
# File sample/issue1136.rb, line 40 def timed_sum(con, query) started = Process.clock_gettime(Process::CLOCK_MONOTONIC) sum = con.execute(query).first.first [Process.clock_gettime(Process::CLOCK_MONOTONIC) - started, sum] end
Source
# File benchmark/appender_ips.rb, line 33 def with_appender db = DuckDB::Database.open con = db.connect new_table(con) app = con.appender('t') yield app app.flush ensure con&.close db&.close end