Source location routine

It has been some time since we have had any discussion on this topic. I thought I should post the full code so that others can try it.

This requires the user to find the events manually and pick off the arrival times. Approximate locations of the stations can be found in the online instrument response XML files. A further development of this could be to automate gathering of lat/lon data from station names. Of course, precise locations would be better - if you knew them.

# Algorithm by Raspberry Boom member "dizcza"
# Posted by kpjamro
# For 3 or more sound arrival times, at 3 or more locations
# this python routine estimates the source location and time
# with more than 3 stations, it selects the best combination of 3 ('triplet')
import geopy.distance
from geopy.distance import geodesic
import numpy as np
from scipy.optimize import fsolve
from itertools import combinations
from datetime import datetime
import time

def localization_impossible(coords, arrivals, v_min=300):
    ii = np.array(list(combinations(range(len(coords)), r=2)))    
    # remove the same stations
    mask = np.linalg.norm(coords[ii[:, 0]] - coords[ii[:, 1]], axis=1) > 1e-6
    ii = ii[mask]    
    possible = []
    for i, j in ii:
        dist = geopy.distance.geodesic(coords[i], coords[j]).m
        t_diff = abs(arrivals[i] - arrivals[j])        
        possible.append(t_diff < (dist)/v_min)    
    return sum(possible) < 3


def time_source_func(coords, arrivals, source, v):
    dist = np.array([geopy.distance.geodesic(source, xy).m for xy in coords])
    t_source = arrivals - dist / v
    return t_source


def get_time_best(t_source2d):
    zero_vec = t_source2d[:, [0]] - t_source2d[:, 1:]    
    cost = np.abs(zero_vec).sum(axis=1)    
    argmin = cost.argmin()    
    return t_source2d[argmin]

def get_best_triplet(t_source, ii):
    t_source2d = t_source[ii]
    zero_vec = t_source2d[:, [0]] - t_source2d[:, 1:]
    cost = np.abs(zero_vec).sum(axis=1)
    argmin = cost.argmin()
    return ii[argmin]

def solve_bounded(source, *args):
    coords, arrivals, v, ii = args
    t_source = time_source_func(coords, arrivals, source, v)
    t_source = get_time_best(t_source[ii])
    zero_vec = t_source[0] - t_source[1:]
    return zero_vec


def get_triple_indices(coords):
    ii = np.array(list(combinations(range(len(coords)), r=3)))
    pairwise_dist = np.linalg.norm(coords[ii[:, [0]]] - coords[ii[:, 1:]], axis=-1)
    mask = (pairwise_dist > 1e-6).all(axis=1)
    ii = ii[mask]
    return ii


def localize_source(coords, arrivals, v=330):
    # coords: [lat, lon] N pairs
    # arrivals: [t0, ... t_N-1]

    def solve_unbounded(source, *args):
        if np.abs(source).max() > 90:
            return np.array([1e9, 1e9])
        return solve_bounded(source, *args)

    if localization_impossible(coords, arrivals):
        return None, None, None

    ii = get_triple_indices(coords)
    res, info, status, message = fsolve(solve_unbounded,
                                        x0=coords.mean(axis=0),
                                        args=(coords, arrivals, v, ii),
                                        full_output=True)

    if status != 1:
        # res, t_source = localize_source_bounded(coords, arrivals, v=v)
        res, t_source, triplet_best = None, None, None
    else:
        # print(np.abs(info['fvec']).sum())
        t_source = time_source_func(coords, arrivals, source=res, v=v)
        triplet_best = get_best_triplet(t_source, ii)
        t_source = t_source[triplet_best].mean()

    prop = {
        "triplet": triplet_best
    }
    return res, t_source, prop

    #  ===== enter data here =============================== #
sound_speed = 343.0
# station location lat, lon
stations = np.array([
    [39.027, -76.887],
    [38.991, -76.086],
    [38.874, -77.340],
    [38.757, -77.058] 
])
# corresponding arrivals "dd/mm/yyyy hh:mm:ss.fff" **UTC**
arrivals = np.array([
    "4/6/2023 19:08:56.072",
    "4/6/2023 19:08:48.315",
    "4/6/2023 19:09:52.600",
    "4/6/2023 19:08:58.846"
])
   # ===== end of data section ==============================#

# the "program" #

print("Input arrival times")
for n in range(0, len(arrivals)): print(arrivals[n])
ats = []
ar = [] 
# change date/time format to seconds in epoch including fractional seconds
for n in range(0, len(arrivals)):
    arrstr = str(arrivals[n])
    ats.append(time.mktime(datetime.strptime(arrstr, "%d/%m/%Y %H:%M:%S.%f").timetuple()))        
    a = (datetime.strptime(arrstr,"%d/%m/%Y %H:%M:%S.%f"))
    ar.append(a.microsecond/1e6 + ats[n])
arrivals = ar    
print("\nArrival times converted to uSec ")
for n in range(0, len(arrivals)): print(ar[n])
print("\nStations ")
for n in range(0, len(arrivals)): print(stations[n])

# this does the work:
source_estimated, t_source_estimated_seconds, prop = localize_source(coords=stations, arrivals=arrivals, v=sound_speed)

print("\nEst. loc. and date/time of event =\n", source_estimated, "   ", time.ctime(t_source_estimated_seconds))
print("\n selected",prop)

# end of file ======================================================


here is a sample output from the data contained in listing

python source_loc.py

Input arrival times
4/6/2023 19:08:56.072
4/6/2023 19:08:48.315
4/6/2023 19:09:52.600
4/6/2023 19:08:58.846

Arrival times converted to uSec
1685920136.072
1685920128.315
1685920192.6
1685920138.846

Stations
[ 39.027 -76.887]
[ 38.991 -76.086]
[ 38.874 -77.34 ]
[ 38.757 -77.058]

Est. loc. and date/time of event =
 [ 38.87739603 -76.92202799]     Sun Jun  4 19:08:06 2023

 selected {'triplet': array([0, 2, 3])}

(the end)

3 Likes

This is super-interesting kpjamro! Thank you for sharing the complete code with us in the community.

I think @sheeny has implemented a method that automates the coordinates collection from the online XML files, or am I wrong? If not, the two could be combined, making the script even more efficient.

1 Like

@kpjamro

LOL: localization_impossible

branden

2 Likes