| Class | ActiveWarehouse::Aggregate::DwarfAggregate |
| In: |
lib/active_warehouse/aggregate/dwarf_aggregate.rb
|
| Parent: | Aggregate |
Implementation of the Dwarf algorithm described in
| number_of_dimensions | [RW] | Accessor for the number of dimensions in the cube. |
Initialize the aggregate
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 8
8: def initialize(cube_class)
9: super
10: end
Aggregate the node by summing all of the values in the cells TODO: support aggregations other than sum
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 76
76: def calculate_aggregate(cells)
77: value = Array.new(cells.first.value.length, 0)
78: cells.each do |c|
79: c.value.each_with_index do |v, index|
80: value[index] += v
81: end
82: end
83: value
84: end
Calculates a common prefix between the two tuples
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 185
185: def calculate_prefix(current_tuple, last_tuple)
186: return [] if last_tuple.nil?
187: prefix = []
188: last_matched_index = nil
189: 0.upto(number_of_dimensions) do |i|
190: if current_tuple[i] == last_tuple[i]
191: prefix << current_tuple[i]
192: else
193: break
194: end
195: end
196: prefix
197: end
Close all of the last nodes that match the specified prefix and return the list of newly closed nodes
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 201
201: def close_nodes(prefix)
202: new_closed = []
203: if @last_nodes
204: @last_nodes[prefix.length + 1, @last_nodes.length].each do |n|
205: n.closed = true
206: new_closed << n
207: end
208: end
209: new_closed
210: end
Create the dwarf cube with the sorted_facts
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 87
87: def create_dwarf_cube(sorted_facts)
88: last_tuple = nil
89: @last_nodes = nil
90: sorted_facts.each do |row|
91: tuple = row.is_a?(Hash) ? create_tuple(row) : row
92:
93: prefix = calculate_prefix(tuple, last_tuple)
94:
95: close_nodes(prefix).each do |n|
96: if n.leaf?
97: n.all_cell = Cell.new('*', calculate_aggregate(n.cells))
98: else
99: n.all_cell = Cell.new('*')
100: n.all_cell.child = suffix_coalesce(n.children)
101: end
102: n.processed = true
103: end
104:
105: nodes = create_nodes(tuple, prefix)
106:
107: write_nodes(nodes)
108: last_tuple = tuple
109: if @last_nodes.nil? then @root_node = nodes.first end
110: @last_nodes = nodes
111: end
112:
113: # Alg 1, Line 13
114: last_leaf_node = @last_nodes.last
115: last_leaf_node.all_cell = Cell.new('*', calculate_aggregate(last_leaf_node.cells))
116:
117: # Alg 1, Line 14
118: @last_nodes[0..@last_nodes.length - 2].reverse.each do |n|
119: n.all_cell = Cell.new('*')
120: n.all_cell.child = suffix_coalesce(n.children)
121: end
122:
123: require File.dirname(__FILE__) + '/dwarf_printer'
124: puts DwarfPrinter.print_node(@root_node)
125: end
Create the nodes for the current tuple
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 213
213: def create_nodes(current_tuple, prefix)
214: nodes = []
215: new_nodes_needed_for = []
216: if @last_nodes.nil?
217: 0.upto(number_of_dimensions - 1) do |i|
218: k = current_tuple[i]
219: parent_cell = (nodes.last.nil?) ? nil : nodes.last.cells.last
220: nodes << Node.new(k, parent_cell)
221: end
222: else
223: if prefix.length > 0
224: 0.upto(prefix.length - 1) do |i|
225: nodes << @last_nodes[i]
226: end
227: end
228: k = current_tuple[prefix.length]
229: n = @last_nodes[prefix.length]
230: n.add_cell(Cell.new(k))
231: nodes << n
232:
233: (prefix.length + 1).upto(number_of_dimensions - 1) do |i|
234: k = current_tuple[i]
235: parent_cell = (nodes.last.nil?) ? nil : nodes.last.cells.last
236: nodes << Node.new(k, parent_cell)
237: end
238: end
239:
240: nodes.last.leaf = true
241: cell = nodes.last.cells.last
242: unless cell.value
243: cell.value = current_tuple[number_of_dimensions..current_tuple.length-1]
244: end
245:
246: nodes
247: end
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 59
59: def filter_nodes(node, dimension_ids, depth, filtered_nodes)
60: #puts "filtering node #{print_node(node, depth, false)}"
61: dimension = dimension_order[depth]
62: #puts "dimension at #{depth} is #{dimension}"
63: node.cells.each do |c|
64: if dimension_ids[dimension].include?(c.key)
65: if depth == dimension_order.length - 1
66: filtered_nodes << node
67: else
68: filter_nodes(c.child, dimension_ids, depth+1, filtered_nodes) unless c.child.nil?
69: end
70: end
71: end
72: end
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 180
180: def number_of_dimensions
181: @number_of_dimensions ||= cube_class.dimension_classes.length
182: end
Populate the aggregate
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 13
13: def populate
14: create_dwarf_cube(sorted_facts)
15: end
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 18
18: def query(*args)
19: options = parse_query_args(*args)
20:
21: column_dimension_name = options[:column_dimension_name]
22: column_hierarchy_name = options[:column_hierarchy_name]
23: row_dimension_name = options[:row_dimension_name]
24: row_hierarchy_name = options[:row_hierarchy_name]
25: conditions = options[:conditions]
26: cstage = options[:cstage]
27: rstage = options[:rstage]
28: filters = options[:filters]
29:
30: column_dimension = Dimension.class_for_name(column_dimension_name)
31: row_dimension = Dimension.class_for_name(row_dimension_name)
32: column_hierarchy = column_dimension.hierarchy(column_hierarchy_name)
33: row_hierarchy = row_dimension.hierarchy(row_hierarchy_name)
34: dimension_ids = {}
35:
36: dimension_order.each do |d|
37: where_clause = []
38: sql = "SELECT id FROM #{d.table_name}"
39: filters.each do |key, value|
40: dimension, column = key.split('.')
41: if d.table_name == dimension
42: where_clause << "#{dimension}.#{column} = '#{value}'" # TODO: protect from SQL injection
43: end
44: end
45: sql += %Q(\nWHERE\n #{where_clause.join(" AND\n ")}) if where_clause.length > 0
46: dimension_ids[d] = cube_class.connection.select_values(sql)
47: end
48: #puts "dimension ids: #{dimension_ids.inspect}"
49:
50: values = Array.new(cube_class.fact_class.aggregate_fields.length, 0)
51:
52: home_nodes = []
53: filter_nodes(@root_node, dimension_ids, 0, home_nodes)
54: #puts "filtered nodes: #{home_nodes.collect(&:id)}"
55:
56: values
57: end
Get a list of sorted keys for the cells in the specified nodes
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 168
168: def sorted_keys(nodes)
169: keys = []
170: nodes.each do |n|
171: n.cells.each do |c|
172: keys << c.key
173: end
174: end
175: keys.uniq.sort { |a, b| a <=> b }
176: end
Coalesce the nodes and return a single node
# File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 128
128: def suffix_coalesce(nodes)
129: if nodes.length == 1
130: return nodes[0]
131: else
132: sub_dwarf = Node.new
133: sub_dwarf.leaf = nodes.first.leaf
134:
135: keys = sorted_keys(nodes)
136: keys.each do |k|
137: to_merge = []
138: nodes.each do |n|
139: n.cells.each do |c|
140: to_merge << c if c.key == k
141: end
142: end
143:
144: if sub_dwarf.leaf?
145: cur_aggr = calculate_aggregate(to_merge) # Alg 2, Line 8
146: sub_dwarf.add_cell(Cell.new(k, cur_aggr)) # Alg 2, Line 9
147: else
148: # Alg 2, Line 11
149: cell = Cell.new(k)
150: cell.child = suffix_coalesce(to_merge.collect{|c| c.child})
151: sub_dwarf.add_cell(cell)
152: end
153: end
154:
155: if sub_dwarf.leaf?
156: sub_dwarf.all_cell = Cell.new("*", calculate_aggregate(sub_dwarf.cells))
157: else
158: cell = Cell.new("*")
159: cell.child = suffix_coalesce(sub_dwarf.children)
160: sub_dwarf.all_cell = cell
161: end
162: end
163:
164: sub_dwarf
165: end