@ -64,11 +64,7 @@ module Mastodon
progress . title = 'Estimating workload '
# Estimate the amount of data that has to be imported first
indices . each do | index |
index . types . each do | type |
progress . total = ( progress . total || 0 ) + type . adapter . default_scope . count
end
end
progress . total = indices . sum { | index | index . adapter . default_scope . count }
# Now import all the actual data. Mind that unlike chewy:sync, we don't
# fetch and compare all record IDs from the database and the index to
@ -80,67 +76,68 @@ module Mastodon
batch_size = 1_000
slice_size = ( batch_size / options [ :concurrency ] ) . ceil
index . types . each do | type |
type . adapter . default_scope . reorder ( nil ) . find_in_batches ( batch_size : batch_size ) do | batch |
futures = [ ]
index . adapter . default_scope . reorder ( nil ) . find_in_batches ( batch_size : batch_size ) do | batch |
futures = [ ]
batch . each_slice ( slice_size ) do | records |
futures << Concurrent :: Future . execute ( executor : pool ) do
begin
if ! progress . total . nil? && progress . progress + records . size > progress . total
# The number of items has changed between start and now,
# since there is no good way to predict the final count from
# here, just change the progress bar to an indeterminate one
batch . each_slice ( slice_size ) do | records |
futures << Concurrent :: Future . execute ( executor : pool ) do
begin
if ! progress . total . nil? && progress . progress + records . size > progress . total
# The number of items has changed between start and now,
# since there is no good way to predict the final count from
# here, just change the progress bar to an indeterminate one
progress . total = nil
end
progress . total = nil
end
grouped_records = nil
bulk_body = nil
index_count = 0
delete_count = 0
grouped_records = nil
bulk_body = nil
index_count = 0
delete_count = 0
ActiveRecord :: Base . connection_pool . with_connection do
grouped_records = type . adapter . send ( :grouped_objects , records )
bulk_body = Chewy :: Type :: Import :: BulkBuilder . new ( type , ** grouped_records ) . bulk_body
ActiveRecord :: Base . connection_pool . with_connection do
grouped_records = records . to_a . group_by do | record |
index . adapter . send ( :delete_from_index? , record ) ? :delete : :to_index
end
index_count = grouped_records [ :index ] . size if grouped_records . key? ( :index )
delete_count = grouped_records [ :delete ] . size if grouped_records . key? ( :delete )
# The following is an optimization for statuses specifically, since
# we want to de-index statuses that cannot be searched by anybody,
# but can't use Chewy's delete_if logic because it doesn't use
# crutches and our searchable_by logic depends on them
if type == StatusesIndex :: Status
bulk_body . map! do | entry |
if entry [ :index ] && entry . dig ( :index , :data , 'searchable_by' ) . blank?
index_count -= 1
delete_count += 1
{ delete : entry [ :index ] . except ( :data ) }
else
entry
end
bulk_body = Chewy :: Index :: Import :: BulkBuilder . new ( index , ** grouped_records ) . bulk_body
end
index_count = grouped_records [ :to_index ] . size if grouped_records . key? ( :to_index )
delete_count = grouped_records [ :delete ] . size if grouped_records . key? ( :delete )
# The following is an optimization for statuses specifically, since
# we want to de-index statuses that cannot be searched by anybody,
# but can't use Chewy's delete_if logic because it doesn't use
# crutches and our searchable_by logic depends on them
if index == StatusesIndex
bulk_body . map! do | entry |
if entry [ :to_index ] && entry . dig ( :to_index , :data , 'searchable_by' ) . blank?
index_count -= 1
delete_count += 1
{ delete : entry [ :to_index ] . except ( :data ) }
else
entry
end
end
end
Chewy :: Type :: Import :: BulkRequest . new ( type ) . perform ( bulk_body )
Chewy :: Index :: Import :: BulkRequest . new ( index ) . perform ( bulk_body )
progress . progress += records . size
progress . progress += records . size
added . increment ( index_count )
removed . increment ( delete_count )
added . increment ( index_count )
removed . increment ( delete_count )
sleep 1
rescue = > e
progress . log pastel . red ( " Error importing #{ index } : #{ e } " )
end
sleep 1
rescue = > e
progress . log pastel . red ( " Error importing #{ index } : #{ e } " )
end
end
futures . map ( & :value )
end
futures . map ( & :value )
end
end