| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- #!/usr/bin/env ruby
- # Backfill normalized address fields for existing da_* tables.
- # Requires: lib/db.rb, lib/geocode.rb (and GOOGLE_MAPS_API_KEY set)
- require "time"
- require_relative "../lib/db"
- require_relative "../lib/geocode"
- ONLY_TABLE = ENV["ONLY_TABLE"].to_s.strip # e.g. "da_meandervalley"
- LIMIT_TOTAL = (ENV["LIMIT"] || "0").to_i # 0 = no cap
- BATCH_SIZE = (ENV["BATCH_SIZE"] || "200").to_i
- SLEEP_MS = (ENV["SLEEP_MS"] || "200").to_i # pause between lookups
- DRY_RUN = ENV["DRY_RUN"] == "1"
- OVERWRITE_ADDRESS = ENV["OVERWRITE_ADDRESS"] == "1" # write normalized string into address column
- START_ID = (ENV["START_ID"] || "0").to_i # resume after this id
- VERBOSE = ENV["VERBOSE"] == "1"
- def say(msg) puts msg end
- def warnx(msg) $stderr.puts msg end
- def msleep(ms) sleep(ms.to_f / 1000.0) end
- def discover_da_tables
- rs = DB.client.query("SHOW TABLES")
- arr = rs.map { |r| r.values.first.to_s }.select { |t| t.start_with?("da_") }
- if ONLY_TABLE != ""
- arr = arr.select { |t| t == ONLY_TABLE }
- end
- arr
- end
- def need_update_clause(overwrite:)
- if overwrite
- # touch everything, so we also overwrite address if desired
- "1=1"
- else
- # fill blanks only
- "(address_std IS NULL OR address_std='') OR street IS NULL OR locality IS NULL OR state IS NULL OR postcode IS NULL OR lat IS NULL OR lng IS NULL"
- end
- end
- def prep_statements(table, overwrite:)
- esc = DB.client.escape(table)
- sel_sql = <<~SQL
- SELECT id, council_reference, address
- FROM `#{esc}`
- WHERE id > ?
- AND #{need_update_clause(overwrite: overwrite)}
- ORDER BY id
- LIMIT ?
- SQL
- # Safe update that always fills normalized columns
- upd_sql = <<~SQL
- UPDATE `#{esc}`
- SET address_std = ?,
- street = ?,
- locality = ?,
- state = ?,
- postcode = ?,
- lat = ?,
- lng = ?
- WHERE id = ?
- SQL
- # Optional overwrite of the canonical address (guarded by unique pair)
- over_sql = <<~SQL
- UPDATE `#{esc}` AS t
- SET address = ?
- WHERE t.id = ?
- AND NOT EXISTS (
- SELECT 1 FROM `#{esc}` AS x
- WHERE x.id <> t.id
- AND x.council_reference = t.council_reference
- AND x.address = ?
- )
- SQL
- [
- DB.client.prepare(sel_sql),
- DB.client.prepare(upd_sql),
- DB.client.prepare(over_sql)
- ]
- end
- def normalize_one!(row, table, upd_stmt, over_stmt, overwrite:)
- id = row["id"]
- ref = row["council_reference"].to_s
- addr = row["address"].to_s
- # Call geocoder (cached if seen before)
- geo = Geocode.format_au(addr)
- unless geo
- warnx " [#{table}] ##{id} #{ref} — geocode miss for: #{addr.inspect}"
- return :miss
- end
- display = geo[:display].to_s
- street = geo[:street].to_s
- loc = geo[:locality].to_s
- state = geo[:state].to_s
- pc = geo[:postcode].to_s
- lat = geo[:lat]
- lng = geo[:lng]
- if DRY_RUN
- say " [#{table}] ##{id} #{ref} -> #{display} | #{lat},#{lng}"
- return :ok
- end
- # Always write normalized columns
- upd_stmt.execute(display, street, loc, state, pc, lat, lng, id)
- # Optionally overwrite the canonical address, but avoid breaking the unique key
- if overwrite && !display.empty? && display != addr
- over_stmt.execute(display, id, display)
- end
- :ok
- rescue StandardError => e
- warnx " [#{table}] ##{id} #{ref} — update error: #{e.class} #{e.message}"
- :err
- end
- def process_table(table)
- sel_stmt, upd_stmt, over_stmt = prep_statements(table, overwrite: OVERWRITE_ADDRESS)
- processed = 0
- ok = miss = err = 0
- last_id = START_ID
- say "Table #{table} — starting at id > #{last_id}, batch=#{BATCH_SIZE}, overwrite_address=#{OVERWRITE_ADDRESS}, dry_run=#{DRY_RUN}"
- loop do
- rs = sel_stmt.execute(last_id, BATCH_SIZE)
- rows = rs.to_a
- break if rows.empty?
- rows.each do |row|
- rc = normalize_one!(row, table, upd_stmt, over_stmt, overwrite: OVERWRITE_ADDRESS)
- case rc
- when :ok then ok += 1
- when :miss then miss+= 1
- when :err then err += 1
- end
- processed += 1
- last_id = row["id"]
- msleep(SLEEP_MS) if SLEEP_MS > 0
- if LIMIT_TOTAL > 0 && processed >= LIMIT_TOTAL
- say "Limit reached (#{LIMIT_TOTAL}). Stopping."
- return [processed, ok, miss, err]
- end
- end
- say " … up to id #{last_id} (ok=#{ok}, miss=#{miss}, err=#{err})" if VERBOSE
- end
- [processed, ok, miss, err]
- end
- # ---- main ----
- start = Time.now
- tables = discover_da_tables
- if tables.empty?
- warnx "No da_* tables found#{ONLY_TABLE != "" ? " matching #{ONLY_TABLE}" : ""}"
- exit 0
- end
- grand = { processed: 0, ok: 0, miss: 0, err: 0 }
- tables.each do |t|
- p, o, m, e = process_table(t)
- say "Finished #{t}: processed=#{p}, ok=#{o}, miss=#{m}, err=#{e}"
- grand[:processed] += p; grand[:ok] += o; grand[:miss] += m; grand[:err] += e
- end
- dur = Time.now - start
- say "All done in #{dur.round(1)}s — processed=#{grand[:processed]}, ok=#{grand[:ok]}, miss=#{grand[:miss]}, err=#{grand[:err]}"
|