Skip to content

Commit

Permalink
Fix live (#138)
Browse files Browse the repository at this point in the history
* Fix live

* Fix wrong iterator usage

* No need to escape

* Fix writter

* Refactoring

* rubycop

* Rubocopt

* Add def debug

* rubo

* Remove aggreg in aggreg2tally

* Add better live support

* remove useless variable

* typo

* Fix the fix.

---------

Co-authored-by: Thomas Applencourt <[email protected]>
Co-authored-by: Brice Videau <[email protected]>
  • Loading branch information
3 people authored Aug 28, 2023
1 parent ee26ccd commit af54e73
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 170 deletions.
296 changes: 161 additions & 135 deletions utils/babeltrace_thapi.in
Original file line number Diff line number Diff line change
@@ -1,31 +1,27 @@
#!/usr/bin/env ruby
DATADIR = File.join('@prefix@', 'share')
$:.unshift(DATADIR) if File.directory?(DATADIR)
$LOAD_PATH.unshift(DATADIR) if File.directory?(DATADIR)
require 'optparse'
require 'babeltrace2'
require 'find'

module MyRefinement

module BTComponentClassRefinement
refine(BT2::BTComponentClass) do
attr_accessor :plugin
attr_accessor :plugin
end

refine(BT2::BTPlugin.singleton_class) do

def find_all_from_dir(path, recurse: false, fail_on_load_error: true)
# I guess print a warning if recurse?
ps = super
ps.each { |p|
p.path = path
}
ps
# I guess print a warning if recurse?
ps = super
ps.each do |p|
p.path = path
end
ps
end

end

refine(BT2::BTPlugin) do

attr_accessor :path

def get_source_component_class_by_name(name)
Expand Down Expand Up @@ -63,18 +59,15 @@ module MyRefinement
cs.each { |c| c.plugin = self }
cs
end

end
refine(BT2::BTGraph.singleton_class) do

def new(handle = nil, retain: true, auto_release: true,
mip_version: 0)
mip_version: 0)
obj = super
obj.plugins_path = []
obj.cli_v = []
obj
end

end

refine(BT2::BTGraph) do
Expand All @@ -86,7 +79,7 @@ module MyRefinement

@plugins_path << component_class.plugin.path
@cli_v << "--component #{name}:#{component_class.type.to_s.split('_').last.downcase}.#{component_class.plugin.name}.#{component_class.name}"
str_params = params.filter_map { |k,v| "#{k}=#{v}" unless v.class == String && v.empty? }.join(",")
str_params = params.filter_map { |k, v| "#{k}=#{v}" unless v.instance_of?(String) && v.empty? }.join(',')
@cli_v << "--params #{str_params.dump}" unless str_params.empty?
super
end
Expand All @@ -97,18 +90,17 @@ module MyRefinement
end

def cli
l = ["babeltrace2"]
l = ['babeltrace2']
ps_u = @plugins_path.uniq.compact.sort
l << "--plugin-path=#{ps_u.join(",")}" unless ps_u.empty?
l << "run"
l << "--plugin-path=#{ps_u.join(',')}" unless ps_u.empty?
l << 'run'
l += @cli_v
l.join(" \\\n")
end

end
end

using MyRefinement
using BTComponentClassRefinement

# Don't complain about broken pipe
Signal.trap('SIGPIPE', 'SYSTEM_DEFAULT')
Expand All @@ -133,12 +125,120 @@ $options_tally = {
'backend_level' => [String, ''],
}

NameComp = Struct.new(:name, :comp)

def get_components(names)
# BT "native" component classes
components_classes = {
'source.ctf.fs' => BT2::BTPlugin.find('ctf').get_source_component_class_by_name('fs'),
'source.ctf.lttng_live' => BT2::BTPlugin.find('ctf').get_source_component_class_by_name('lttng-live'),
'filter.utils.muxer' => BT2::BTPlugin.find('utils').get_filter_component_class_by_name('muxer'),
'sink.text.pretty' => BT2::BTPlugin.find('text').get_sink_component_class_by_name('pretty'),
'sink.ctf.fs' => BT2::BTPlugin.find('ctf').get_sink_component_class_by_name('fs'),
'sink.utils.dummy' => BT2::BTPlugin.find('utils').get_sink_component_class_by_name('dummy'),
}

# THAPI components
thapi_plugins = BT2::BTPlugin.find_all_from_dir(File.join('@prefix@', 'lib'))
thapi_plugins.map do |pg|
pg.source_component_classes.each { |c| components_classes["source.#{pg.name}.#{c.name}"] = c }
pg.filter_component_classes.each { |c| components_classes["filter.#{pg.name}.#{c.name}"] = c }
pg.sink_component_classes.each { |c| components_classes["sink.#{pg.name}.#{c.name}"] = c }
end

names.flat_map do |name|
case name
when 'filter.intervals.interval'
$options[:backends].map { |b| NameComp.new(b, components_classes["filter.#{b}interval.interval"]) }
when 'sink.text.rubypretty'
# Yaml and event_lambdas are required by babeltrace*_lib
$event_lambdas = {}
require 'yaml'
require 'babeltrace_omp_lib' if $options[:backends].include?('omp')
require 'babeltrace_opencl_lib' if $options[:backends].include?('cl')
require 'babeltrace_ze_lib' if $options[:backends].include?('ze')
require 'babeltrace_cuda_lib' if $options[:backends].include?('cuda')
require 'babeltrace_hip_lib' if $options[:backends].include?('hip')
f = lambda { |iterator, _|
iterator.next_messages.each do |m|
next unless m.type == :BT_MESSAGE_TYPE_EVENT

e = m.event
l = $event_lambdas[e.name]
next unless l || !$options[:restrict]

str = Time.at(0, m.get_default_clock_snapshot.ns_from_origin, :nsec).strftime('%H:%M:%S.%9L').to_s
if $options[:context]
str << " - #{e.stream.trace.get_environment_entry_value_by_name('hostname')}"
str << ' - ' << e.get_common_context_field.value.collect do |k, v|
"#{k}: #{v}"
end.join(', ')
end
str << " - #{e.name}: "
str << (l ? l.call(e.payload_field.value) : e.payload_field.to_s)
puts str
end
}
NameComp.new('sink.text.rubypretty', f)
else
NameComp.new(name, components_classes[name])
end
end
end

def get_and_add_components(graph, names)
get_components(names).map do |nc|
name = nc.name
comp = nc.comp
case name
when 'sink.text.rubypretty'
graph.add_simple_sink('rubypretty', comp)
when 'source.ctf.lttng_live'
graph.add(comp, 'source_live',
params: { 'inputs' => $options[:inputs],
'session-not-found-action' => 'end' })
when 'source.ctf.fs'
Find.find(*ARGV)
.reject { |path| FileTest.directory?(path) }
.filter_map { |path| File.dirname(path) if File.basename(path) == 'metadata' }
.select do |path|
qe = BT2::BTQueryExecutor.new(component_class: comp, object_name: 'babeltrace.support-info',
params: { 'input' => path, 'type' => 'directory' })
qe.query.value['weight'] > 0.5
end
.each_with_index.map do |trace_location, i|
graph.add(comp, "source_#{i}",
params: { 'inputs' => [trace_location] })
end
when 'sink.ctf.fs'
graph.add(comp, 'ctf_sink',
params: { 'path' => $options[:output],
'assume-single-trace' => false,
'quiet' => $options[:debug] ? false : true })
when 'sink.btx_tally.tally'
graph.add(comp, 'tally',
params: $options_tally.transform_values { |_, v| v })
else
graph.add(comp, name)
end
end
end

def connects(graph, comps)
comps.each_cons(2) do |out, in_|
[out].flatten.flat_map(&:output_ports).each_with_index do |op, i|
ip = in_.input_port(i)
graph.connect_ports(op, ip)
end
end
end

def common_options(opts)
opts.on('-b', '--backend BACKEND', Array) do |v|
$options[:backends] = v
end

opts.on('--debug') do |v|
opts.on('--debug') do |_v|
$options[:debug] = true
end

Expand Down Expand Up @@ -171,6 +271,19 @@ subcommands = {
$options[:live] = true
end
end,
'live2aggreg' =>
OptionParser.new do |opts|
opts.banner = 'Usage: live2aggreg [OPTIONS]'
common_options(opts)

opts.on('--inputs=INPUTS') do |inputs|
$options[:inputs] = [inputs]
end

opts.on('--output=OUTPUT') do |output|
$options[:output] = output
end
end,
'tally' =>
OptionParser.new do |opts|
opts.banner = 'Usage: tally [OPTIONS] trace_directory...'
Expand All @@ -180,6 +293,17 @@ subcommands = {
$options[:live] = true
end

$options_tally.each do |k, (t, _)|
opts.on("--#{k}=VALUE", t) do |v|
$options_tally[k] = [t, v]
end
end
end,
'aggreg2tally' =>
OptionParser.new do |opts|
opts.banner = 'Usage: aggreg2tally [OPTIONS] trace_directory...'
common_options(opts)

$options_tally.each do |k, (t, _)|
opts.on("--#{k}=VALUE", t) do |v|
$options_tally[k] = [t, v]
Expand All @@ -205,123 +329,25 @@ subcommands[command].order!
# Fix segfault
ARGV.uniq!

# BT "native" component classes
components_classes = {
'source.ctf.fs' => BT2::BTPlugin.find('ctf').get_source_component_class_by_name('fs'),
'source.ctf.lttng_live' => BT2::BTPlugin.find('ctf').get_source_component_class_by_name('lttng-live'),
'filter.utils.muxer' => BT2::BTPlugin.find('utils').get_filter_component_class_by_name('muxer'),
'sink.text.pretty' => BT2::BTPlugin.find('text').get_sink_component_class_by_name('pretty'),
}

# THAPI components
thapi_plugins = BT2::BTPlugin.find_all_from_dir(File.join('@prefix@', 'lib'))
thapi_plugins.map do |pg|
pg.source_component_classes.each { |c| components_classes["source.#{pg.name}.#{c.name}"] = c }
pg.filter_component_classes.each { |c| components_classes["filter.#{pg.name}.#{c.name}"] = c }
pg.sink_component_classes.each { |c| components_classes["sink.#{pg.name}.#{c.name}"] = c }
end
thapi_graph = { 'tally' => ['source.ctf.fs', 'filter.utils.muxer', 'filter.intervals.interval',
'filter.btx_aggreg.aggreg', 'sink.btx_tally.tally'],
'timeline' => ['source.ctf.fs', 'filter.utils.muxer', 'filter.intervals.interval',
'sink.btx_timeline.timeline'],
'trace' => ['source.ctf.fs', 'filter.utils.muxer', 'sink.text.rubypretty'],
'live2aggreg' => ['source.ctf.lttng_live', 'filter.utils.muxer', 'filter.intervals.interval',
'filter.btx_aggreg.aggreg', 'sink.ctf.fs'],
'aggreg2tally' => ['source.ctf.fs', 'filter.btx_aggreg.aggreg', 'sink.btx_tally.tally'] }

graph = BT2::BTGraph.new

# Source components
if !$options[:live]
ctf_fs = components_classes.fetch('source.ctf.fs')

trace_locations =
Find.find(*ARGV).reject do |path|
FileTest.directory?(path)
end.select do |path|
File.basename(path) == 'metadata'
end.collect do |path|
File.dirname(path)
end.select do |path|
qe = BT2::BTQueryExecutor.new(component_class: ctf_fs, object_name: 'babeltrace.support-info',
params: { 'input' => path, 'type' => 'directory' })
qe.query.value['weight'] > 0.5
end

comp_sources = trace_locations.each_with_index.map do |trace_location, i|
graph.add_component(ctf_fs, "source_#{i}", params: { 'inputs' => [trace_location] })
end
else
trace_locations = ARGV
ctf_lttng_live = components_classes.fetch('source.ctf.lttng_live')
comp_sources = trace_locations.each_with_index.map do |trace_location, i|
graph.add_component(ctf_lttng_live, "source_#{i}",
params: { 'inputs' => [trace_location], 'session-not-found-action' => 'end' })
end
end

# Muxer components
comp_muxer = graph.add_component(components_classes.fetch('filter.utils.muxer'), 'muxer')

# Sources to muxer connection
comp_sources.flat_map(&:output_ports).each_with_index do |op, i|
ip = comp_muxer.input_port(i)
graph.connect_ports(op, ip)
end

# Rest of graph components
comps = []
case command
when 'trace'
# Yaml and event_lambdas are required by babeltrace*_lib
$event_lambdas = {}
require 'yaml'
require 'babeltrace_omp_lib' if $options[:backends].include?('omp')
require 'babeltrace_opencl_lib' if $options[:backends].include?('cl')
require 'babeltrace_ze_lib' if $options[:backends].include?('ze')
require 'babeltrace_cuda_lib' if $options[:backends].include?('cuda')
require 'babeltrace_hip_lib' if $options[:backends].include?('hip')

consume = lambda { |iterator, _|
iterator.next_messages.each do |m|
next unless m.type == :BT_MESSAGE_TYPE_EVENT

e = m.event
l = $event_lambdas[e.name]
next unless l || !$options[:restrict]

str = "#{Time.at(0, m.get_default_clock_snapshot.ns_from_origin, :nsec).strftime('%H:%M:%S.%9L')}"
if $options[:context]
str << " - #{e.stream.trace.get_environment_entry_value_by_name('hostname')}"
str << ' - ' << e.get_common_context_field.value.collect do |k, v|
"#{k}: #{v}"
end.join(', ')
end
str << " - #{e.name}: "
str << (l ? l.call(e.payload_field.value) : e.payload_field.to_s)
puts str
end
}
comps << graph.add_simple_sink('babeltrace_thapi', consume)
when 'tally'
$options[:backends].each do |name|
comps << graph.add_component(components_classes.fetch("filter.#{name}interval.interval"), "#{name}interval")
end
comps << graph.add_component(components_classes.fetch('filter.btx_aggreg.aggreg'), 'aggreg')
comps << graph.add_component(components_classes.fetch('sink.btx_tally.tally'), 'tally',
params: $options_tally.transform_values { |_, v| v })
when 'timeline'
$options[:backends].each do |name|
comps << graph.add_component(components_classes.fetch("filter.#{name}interval.interval"), "#{name}interval")
end
comps << graph.add_component(components_classes.fetch('sink.btx_timeline.timeline'), 'timeline')
end

# Muxer to serial part
[comp_muxer, comps].flatten.each_cons(2) do |_out, _in|
op = _out.output_port(0)
ip = _in.input_port(0)
graph.connect_ports(op, ip)
end
comp = get_and_add_components(graph, thapi_graph[command])
connects(graph, comp)

if $options[:debug]
cli = graph.cli
#puts cli
puts "babeltrace_thapi: babeltrace2 cli command will be saved in ./babeltrace_thapi_cli.sh"
# puts cli
puts 'babeltrace_thapi: babeltrace2 cli command will be saved in ./babeltrace_thapi_cli.sh'
$stdout.flush
File::open('babeltrace_thapi_cli.sh','w') { |f| f.write(cli) }
File.open('babeltrace_thapi_cli.sh', 'w') { |f| f.write(cli) }
end

graph.run
Loading

0 comments on commit af54e73

Please sign in to comment.