backfill_geocode.rb 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #!/usr/bin/env ruby
  2. # Backfill normalized address fields for existing da_* tables.
  3. # Requires: lib/db.rb, lib/geocode.rb (and GOOGLE_MAPS_API_KEY set)
  4. require "time"
  5. require_relative "../lib/db"
  6. require_relative "../lib/geocode"
  7. ONLY_TABLE = ENV["ONLY_TABLE"].to_s.strip # e.g. "da_meandervalley"
  8. LIMIT_TOTAL = (ENV["LIMIT"] || "0").to_i # 0 = no cap
  9. BATCH_SIZE = (ENV["BATCH_SIZE"] || "200").to_i
  10. SLEEP_MS = (ENV["SLEEP_MS"] || "200").to_i # pause between lookups
  11. DRY_RUN = ENV["DRY_RUN"] == "1"
  12. OVERWRITE_ADDRESS = ENV["OVERWRITE_ADDRESS"] == "1" # write normalized string into address column
  13. START_ID = (ENV["START_ID"] || "0").to_i # resume after this id
  14. VERBOSE = ENV["VERBOSE"] == "1"
  15. def say(msg) puts msg end
  16. def warnx(msg) $stderr.puts msg end
  17. def msleep(ms) sleep(ms.to_f / 1000.0) end
  18. def discover_da_tables
  19. rs = DB.client.query("SHOW TABLES")
  20. arr = rs.map { |r| r.values.first.to_s }.select { |t| t.start_with?("da_") }
  21. if ONLY_TABLE != ""
  22. arr = arr.select { |t| t == ONLY_TABLE }
  23. end
  24. arr
  25. end
  26. def ensure_columns!(table)
  27. Geocode.ensure_da_columns!(table)
  28. end
  29. def need_update_clause(overwrite:)
  30. if overwrite
  31. # touch everything, so we also overwrite address if desired
  32. "1=1"
  33. else
  34. # fill blanks only
  35. "(address_std IS NULL OR address_std='') OR street IS NULL OR locality IS NULL OR state IS NULL OR postcode IS NULL OR lat IS NULL OR lng IS NULL"
  36. end
  37. end
  38. def prep_statements(table, overwrite:)
  39. esc = DB.client.escape(table)
  40. sel_sql = <<~SQL
  41. SELECT id, council_reference, address
  42. FROM `#{esc}`
  43. WHERE id > ?
  44. AND #{need_update_clause(overwrite: overwrite)}
  45. ORDER BY id
  46. LIMIT ?
  47. SQL
  48. # Safe update that always fills normalized columns
  49. upd_sql = <<~SQL
  50. UPDATE `#{esc}`
  51. SET address_std = ?,
  52. street = ?,
  53. locality = ?,
  54. state = ?,
  55. postcode = ?,
  56. lat = ?,
  57. lng = ?
  58. WHERE id = ?
  59. SQL
  60. # Optional overwrite of the canonical address (guarded by unique pair)
  61. over_sql = <<~SQL
  62. UPDATE `#{esc}` AS t
  63. SET address = ?
  64. WHERE t.id = ?
  65. AND NOT EXISTS (
  66. SELECT 1 FROM `#{esc}` AS x
  67. WHERE x.id <> t.id
  68. AND x.council_reference = t.council_reference
  69. AND x.address = ?
  70. )
  71. SQL
  72. [
  73. DB.client.prepare(sel_sql),
  74. DB.client.prepare(upd_sql),
  75. DB.client.prepare(over_sql)
  76. ]
  77. end
  78. def normalize_one!(row, table, upd_stmt, over_stmt, overwrite:)
  79. id = row["id"]
  80. ref = row["council_reference"].to_s
  81. addr = row["address"].to_s
  82. # Call geocoder (cached if seen before)
  83. geo = Geocode.format_au(addr)
  84. unless geo
  85. warnx " [#{table}] ##{id} #{ref} — geocode miss for: #{addr.inspect}"
  86. return :miss
  87. end
  88. display = geo[:display].to_s
  89. street = geo[:street].to_s
  90. loc = geo[:locality].to_s
  91. state = geo[:state].to_s
  92. pc = geo[:postcode].to_s
  93. lat = geo[:lat]
  94. lng = geo[:lng]
  95. if DRY_RUN
  96. say " [#{table}] ##{id} #{ref} -> #{display} | #{lat},#{lng}"
  97. return :ok
  98. end
  99. # Always write normalized columns
  100. upd_stmt.execute(display, street, loc, state, pc, lat, lng, id)
  101. # Optionally overwrite the canonical address, but avoid breaking the unique key
  102. if overwrite && !display.empty? && display != addr
  103. over_stmt.execute(display, id, display)
  104. end
  105. :ok
  106. rescue => e
  107. warnx " [#{table}] ##{id} #{ref} — update error: #{e.class} #{e.message}"
  108. :err
  109. end
  110. def process_table(table)
  111. ensure_columns!(table)
  112. sel_stmt, upd_stmt, over_stmt = prep_statements(table, overwrite: OVERWRITE_ADDRESS)
  113. processed = 0
  114. ok = miss = err = 0
  115. last_id = START_ID
  116. say "Table #{table} — starting at id > #{last_id}, batch=#{BATCH_SIZE}, overwrite_address=#{OVERWRITE_ADDRESS}, dry_run=#{DRY_RUN}"
  117. loop do
  118. rs = sel_stmt.execute(last_id, BATCH_SIZE)
  119. rows = rs.to_a
  120. break if rows.empty?
  121. rows.each do |row|
  122. rc = normalize_one!(row, table, upd_stmt, over_stmt, overwrite: OVERWRITE_ADDRESS)
  123. case rc
  124. when :ok then ok += 1
  125. when :miss then miss+= 1
  126. when :err then err += 1
  127. end
  128. processed += 1
  129. last_id = row["id"]
  130. msleep(SLEEP_MS) if SLEEP_MS > 0
  131. if LIMIT_TOTAL > 0 && processed >= LIMIT_TOTAL
  132. say "Limit reached (#{LIMIT_TOTAL}). Stopping."
  133. return [processed, ok, miss, err]
  134. end
  135. end
  136. say " … up to id #{last_id} (ok=#{ok}, miss=#{miss}, err=#{err})" if VERBOSE
  137. end
  138. [processed, ok, miss, err]
  139. end
  140. # ---- main ----
  141. start = Time.now
  142. tables = discover_da_tables
  143. if tables.empty?
  144. warnx "No da_* tables found#{ONLY_TABLE != "" ? " matching #{ONLY_TABLE}" : ""}"
  145. exit 0
  146. end
  147. grand = { processed: 0, ok: 0, miss: 0, err: 0 }
  148. tables.each do |t|
  149. p, o, m, e = process_table(t)
  150. say "Finished #{t}: processed=#{p}, ok=#{o}, miss=#{m}, err=#{e}"
  151. grand[:processed] += p; grand[:ok] += o; grand[:miss] += m; grand[:err] += e
  152. end
  153. dur = Time.now - start
  154. say "All done in #{dur.round(1)}s — processed=#{grand[:processed]}, ok=#{grand[:ok]}, miss=#{grand[:miss]}, err=#{grand[:err]}"