backfill_geocode.rb 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #!/usr/bin/env ruby
  2. # Backfill normalized address fields for existing da_* tables.
  3. # Requires: lib/db.rb, lib/geocode.rb (and GOOGLE_MAPS_API_KEY set)
  4. require "time"
  5. require_relative "../lib/db"
  6. require_relative "../lib/geocode"
  7. ONLY_TABLE = ENV["ONLY_TABLE"].to_s.strip # e.g. "da_meandervalley"
  8. LIMIT_TOTAL = (ENV["LIMIT"] || "0").to_i # 0 = no cap
  9. BATCH_SIZE = (ENV["BATCH_SIZE"] || "200").to_i
  10. SLEEP_MS = (ENV["SLEEP_MS"] || "200").to_i # pause between lookups
  11. DRY_RUN = ENV["DRY_RUN"] == "1"
  12. OVERWRITE_ADDRESS = ENV["OVERWRITE_ADDRESS"] == "1" # write normalized string into address column
  13. START_ID = (ENV["START_ID"] || "0").to_i # resume after this id
  14. VERBOSE = ENV["VERBOSE"] == "1"
  15. def say(msg) puts msg end
  16. def warnx(msg) $stderr.puts msg end
  17. def msleep(ms) sleep(ms.to_f / 1000.0) end
  18. def discover_da_tables
  19. rs = DB.client.query("SHOW TABLES")
  20. arr = rs.map { |r| r.values.first.to_s }.select { |t| t.start_with?("da_") }
  21. if ONLY_TABLE != ""
  22. arr = arr.select { |t| t == ONLY_TABLE }
  23. end
  24. arr
  25. end
  26. def need_update_clause(overwrite:)
  27. if overwrite
  28. # touch everything, so we also overwrite address if desired
  29. "1=1"
  30. else
  31. # fill blanks only
  32. "(address_std IS NULL OR address_std='') OR street IS NULL OR locality IS NULL OR state IS NULL OR postcode IS NULL OR lat IS NULL OR lng IS NULL"
  33. end
  34. end
  35. def prep_statements(table, overwrite:)
  36. esc = DB.client.escape(table)
  37. sel_sql = <<~SQL
  38. SELECT id, council_reference, address
  39. FROM `#{esc}`
  40. WHERE id > ?
  41. AND #{need_update_clause(overwrite: overwrite)}
  42. ORDER BY id
  43. LIMIT ?
  44. SQL
  45. # Safe update that always fills normalized columns
  46. upd_sql = <<~SQL
  47. UPDATE `#{esc}`
  48. SET address_std = ?,
  49. street = ?,
  50. locality = ?,
  51. state = ?,
  52. postcode = ?,
  53. lat = ?,
  54. lng = ?
  55. WHERE id = ?
  56. SQL
  57. # Optional overwrite of the canonical address (guarded by unique pair)
  58. over_sql = <<~SQL
  59. UPDATE `#{esc}` AS t
  60. SET address = ?
  61. WHERE t.id = ?
  62. AND NOT EXISTS (
  63. SELECT 1 FROM `#{esc}` AS x
  64. WHERE x.id <> t.id
  65. AND x.council_reference = t.council_reference
  66. AND x.address = ?
  67. )
  68. SQL
  69. [
  70. DB.client.prepare(sel_sql),
  71. DB.client.prepare(upd_sql),
  72. DB.client.prepare(over_sql)
  73. ]
  74. end
  75. def normalize_one!(row, table, upd_stmt, over_stmt, overwrite:)
  76. id = row["id"]
  77. ref = row["council_reference"].to_s
  78. addr = row["address"].to_s
  79. # Call geocoder (cached if seen before)
  80. geo = Geocode.format_au(addr)
  81. unless geo
  82. warnx " [#{table}] ##{id} #{ref} — geocode miss for: #{addr.inspect}"
  83. return :miss
  84. end
  85. display = geo[:display].to_s
  86. street = geo[:street].to_s
  87. loc = geo[:locality].to_s
  88. state = geo[:state].to_s
  89. pc = geo[:postcode].to_s
  90. lat = geo[:lat]
  91. lng = geo[:lng]
  92. if DRY_RUN
  93. say " [#{table}] ##{id} #{ref} -> #{display} | #{lat},#{lng}"
  94. return :ok
  95. end
  96. # Always write normalized columns
  97. upd_stmt.execute(display, street, loc, state, pc, lat, lng, id)
  98. # Optionally overwrite the canonical address, but avoid breaking the unique key
  99. if overwrite && !display.empty? && display != addr
  100. over_stmt.execute(display, id, display)
  101. end
  102. :ok
  103. rescue StandardError => e
  104. warnx " [#{table}] ##{id} #{ref} — update error: #{e.class} #{e.message}"
  105. :err
  106. end
  107. def process_table(table)
  108. sel_stmt, upd_stmt, over_stmt = prep_statements(table, overwrite: OVERWRITE_ADDRESS)
  109. processed = 0
  110. ok = miss = err = 0
  111. last_id = START_ID
  112. say "Table #{table} — starting at id > #{last_id}, batch=#{BATCH_SIZE}, overwrite_address=#{OVERWRITE_ADDRESS}, dry_run=#{DRY_RUN}"
  113. loop do
  114. rs = sel_stmt.execute(last_id, BATCH_SIZE)
  115. rows = rs.to_a
  116. break if rows.empty?
  117. rows.each do |row|
  118. rc = normalize_one!(row, table, upd_stmt, over_stmt, overwrite: OVERWRITE_ADDRESS)
  119. case rc
  120. when :ok then ok += 1
  121. when :miss then miss+= 1
  122. when :err then err += 1
  123. end
  124. processed += 1
  125. last_id = row["id"]
  126. msleep(SLEEP_MS) if SLEEP_MS > 0
  127. if LIMIT_TOTAL > 0 && processed >= LIMIT_TOTAL
  128. say "Limit reached (#{LIMIT_TOTAL}). Stopping."
  129. return [processed, ok, miss, err]
  130. end
  131. end
  132. say " … up to id #{last_id} (ok=#{ok}, miss=#{miss}, err=#{err})" if VERBOSE
  133. end
  134. [processed, ok, miss, err]
  135. end
  136. # ---- main ----
  137. start = Time.now
  138. tables = discover_da_tables
  139. if tables.empty?
  140. warnx "No da_* tables found#{ONLY_TABLE != "" ? " matching #{ONLY_TABLE}" : ""}"
  141. exit 0
  142. end
  143. grand = { processed: 0, ok: 0, miss: 0, err: 0 }
  144. tables.each do |t|
  145. p, o, m, e = process_table(t)
  146. say "Finished #{t}: processed=#{p}, ok=#{o}, miss=#{m}, err=#{e}"
  147. grand[:processed] += p; grand[:ok] += o; grand[:miss] += m; grand[:err] += e
  148. end
  149. dur = Time.now - start
  150. say "All done in #{dur.round(1)}s — processed=#{grand[:processed]}, ok=#{grand[:ok]}, miss=#{grand[:miss]}, err=#{grand[:err]}"