Class ActiveWarehouse::Aggregate::DwarfAggregate
In: lib/active_warehouse/aggregate/dwarf_aggregate.rb
Parent: Aggregate

Implementation of the Dwarf algorithm described in

Methods

Included Modules

DwarfCommon

Classes and Modules

Class ActiveWarehouse::Aggregate::DwarfAggregate::Cell
Class ActiveWarehouse::Aggregate::DwarfAggregate::Node

Attributes

number_of_dimensions  [RW]  Accessor for the number of dimensions in the cube.

Public Class methods

Initialize the aggregate

[Source]

    # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 8
 8:       def initialize(cube_class)
 9:         super
10:       end

Public Instance methods

Aggregate the node by summing all of the values in the cells TODO: support aggregations other than sum

[Source]

    # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 76
76:       def calculate_aggregate(cells)
77:         value = Array.new(cells.first.value.length, 0)
78:         cells.each do |c|
79:           c.value.each_with_index do |v, index|
80:             value[index] += v
81:           end
82:         end
83:         value
84:       end

Calculates a common prefix between the two tuples

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 185
185:       def calculate_prefix(current_tuple, last_tuple)
186:         return [] if last_tuple.nil?
187:         prefix = []
188:         last_matched_index = nil
189:         0.upto(number_of_dimensions) do |i|
190:           if current_tuple[i] == last_tuple[i]
191:             prefix << current_tuple[i]
192:           else
193:             break
194:           end
195:         end
196:         prefix
197:       end

Close all of the last nodes that match the specified prefix and return the list of newly closed nodes

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 201
201:       def close_nodes(prefix)
202:         new_closed = []
203:         if @last_nodes
204:           @last_nodes[prefix.length + 1, @last_nodes.length].each do |n|
205:             n.closed = true
206:             new_closed << n
207:           end
208:         end
209:         new_closed
210:       end

Create the dwarf cube with the sorted_facts

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 87
 87:       def create_dwarf_cube(sorted_facts)
 88:         last_tuple = nil
 89:         @last_nodes = nil
 90:         sorted_facts.each do |row|
 91:           tuple = row.is_a?(Hash) ? create_tuple(row) : row
 92:           
 93:           prefix = calculate_prefix(tuple, last_tuple)
 94:           
 95:           close_nodes(prefix).each do |n|
 96:             if n.leaf?
 97:               n.all_cell = Cell.new('*', calculate_aggregate(n.cells))
 98:             else
 99:               n.all_cell = Cell.new('*')
100:               n.all_cell.child = suffix_coalesce(n.children)
101:             end
102:             n.processed = true
103:           end
104:           
105:           nodes = create_nodes(tuple, prefix)
106:           
107:           write_nodes(nodes)
108:           last_tuple = tuple
109:           if @last_nodes.nil? then @root_node = nodes.first end
110:           @last_nodes = nodes
111:         end
112:         
113:         # Alg 1, Line 13
114:         last_leaf_node = @last_nodes.last
115:         last_leaf_node.all_cell = Cell.new('*', calculate_aggregate(last_leaf_node.cells))
116:         
117:         # Alg 1, Line 14
118:         @last_nodes[0..@last_nodes.length - 2].reverse.each do |n|
119:           n.all_cell = Cell.new('*')
120:           n.all_cell.child = suffix_coalesce(n.children)
121:         end
122:         
123:         require File.dirname(__FILE__) + '/dwarf_printer'
124:         puts DwarfPrinter.print_node(@root_node)
125:       end

Create the nodes for the current tuple

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 213
213:       def create_nodes(current_tuple, prefix)
214:         nodes = []
215:         new_nodes_needed_for = []
216:         if @last_nodes.nil?
217:           0.upto(number_of_dimensions - 1) do |i|
218:             k = current_tuple[i]
219:             parent_cell = (nodes.last.nil?) ? nil : nodes.last.cells.last
220:             nodes << Node.new(k, parent_cell)
221:           end
222:         else
223:           if prefix.length > 0
224:             0.upto(prefix.length - 1) do |i|
225:               nodes << @last_nodes[i]
226:             end
227:           end
228:           k = current_tuple[prefix.length]
229:           n = @last_nodes[prefix.length]
230:           n.add_cell(Cell.new(k))
231:           nodes << n
232:           
233:           (prefix.length + 1).upto(number_of_dimensions - 1) do |i|
234:             k = current_tuple[i]
235:             parent_cell = (nodes.last.nil?) ? nil : nodes.last.cells.last
236:             nodes << Node.new(k, parent_cell)
237:           end
238:         end
239:         
240:         nodes.last.leaf = true
241:         cell = nodes.last.cells.last
242:         unless cell.value
243:           cell.value = current_tuple[number_of_dimensions..current_tuple.length-1]
244:         end
245:         
246:         nodes
247:       end

[Source]

    # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 59
59:       def filter_nodes(node, dimension_ids, depth, filtered_nodes)
60:         #puts "filtering node #{print_node(node, depth, false)}"
61:         dimension = dimension_order[depth]
62:         #puts "dimension at #{depth} is #{dimension}"
63:         node.cells.each do |c|
64:           if dimension_ids[dimension].include?(c.key)
65:             if depth == dimension_order.length - 1
66:               filtered_nodes << node
67:             else
68:               filter_nodes(c.child, dimension_ids, depth+1, filtered_nodes) unless c.child.nil?
69:             end
70:           end
71:         end
72:       end

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 180
180:       def number_of_dimensions
181:         @number_of_dimensions ||= cube_class.dimension_classes.length
182:       end

Populate the aggregate

[Source]

    # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 13
13:       def populate
14:         create_dwarf_cube(sorted_facts)
15:       end

query

[Source]

    # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 18
18:       def query(*args)
19:         options = parse_query_args(*args)
20:         
21:         column_dimension_name = options[:column_dimension_name]
22:         column_hierarchy_name = options[:column_hierarchy_name]
23:         row_dimension_name = options[:row_dimension_name]
24:         row_hierarchy_name = options[:row_hierarchy_name]
25:         conditions = options[:conditions]
26:         cstage = options[:cstage]
27:         rstage = options[:rstage]
28:         filters = options[:filters]
29:         
30:         column_dimension = Dimension.class_for_name(column_dimension_name)
31:         row_dimension = Dimension.class_for_name(row_dimension_name)
32:         column_hierarchy = column_dimension.hierarchy(column_hierarchy_name)
33:         row_hierarchy = row_dimension.hierarchy(row_hierarchy_name)
34:         dimension_ids = {}
35:         
36:         dimension_order.each do |d|
37:           where_clause = []
38:           sql = "SELECT id FROM #{d.table_name}"
39:           filters.each do |key, value|
40:             dimension, column = key.split('.')
41:             if d.table_name == dimension
42:               where_clause << "#{dimension}.#{column} = '#{value}'" # TODO: protect from SQL injection
43:             end
44:           end
45:           sql += %Q(\nWHERE\n  #{where_clause.join(" AND\n  ")}) if where_clause.length > 0
46:           dimension_ids[d] = cube_class.connection.select_values(sql)
47:         end
48:         #puts "dimension ids: #{dimension_ids.inspect}"
49:         
50:         values = Array.new(cube_class.fact_class.aggregate_fields.length, 0)
51:         
52:         home_nodes = []
53:         filter_nodes(@root_node, dimension_ids, 0, home_nodes)
54:         #puts "filtered nodes: #{home_nodes.collect(&:id)}"
55:         
56:         values
57:       end

Get a list of sorted keys for the cells in the specified nodes

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 168
168:       def sorted_keys(nodes)
169:         keys = []
170:         nodes.each do |n|
171:           n.cells.each do |c|
172:             keys << c.key
173:           end
174:         end
175:         keys.uniq.sort { |a, b| a <=> b }
176:       end

Coalesce the nodes and return a single node

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 128
128:       def suffix_coalesce(nodes)
129:         if nodes.length == 1
130:           return nodes[0]
131:         else
132:           sub_dwarf = Node.new
133:           sub_dwarf.leaf = nodes.first.leaf
134:           
135:           keys = sorted_keys(nodes)
136:           keys.each do |k|
137:             to_merge = []
138:             nodes.each do |n|
139:               n.cells.each do |c|
140:                 to_merge << c if c.key == k
141:               end
142:             end
143:             
144:             if sub_dwarf.leaf?
145:               cur_aggr = calculate_aggregate(to_merge)    # Alg 2, Line 8
146:               sub_dwarf.add_cell(Cell.new(k, cur_aggr))   # Alg 2, Line 9
147:             else
148:               # Alg 2, Line 11
149:               cell = Cell.new(k)
150:               cell.child = suffix_coalesce(to_merge.collect{|c| c.child})
151:               sub_dwarf.add_cell(cell)
152:             end
153:           end
154:           
155:           if sub_dwarf.leaf?
156:             sub_dwarf.all_cell = Cell.new("*", calculate_aggregate(sub_dwarf.cells))
157:           else
158:             cell = Cell.new("*")
159:             cell.child = suffix_coalesce(sub_dwarf.children)
160:             sub_dwarf.all_cell = cell
161:           end
162:         end
163:         
164:         sub_dwarf
165:       end

Write nodes to the filesystem.

[Source]

     # File lib/active_warehouse/aggregate/dwarf_aggregate.rb, line 250
250:       def write_nodes(nodes)
251:         # open(File.new(cube_class.name + '.dat'), 'w') do |f|
252: #           
253: #         end
254:       end

[Validate]