Source code for refl1d.probe.stitch

"""
Data stitching.

Join together datasets yielding unique sorted x.
"""

from numpy import hstack, vstack, argsort, sum, sqrt


[docs] def stitch(data, same_x=0.001, same_dx=0.001): """ Stitch together multiple measurements into one. *data* a list of datasets with x, dx, y, dy attributes *same_x* minimum point separation (default is 0.001). *same_dx* minimum change in resolution that may be averaged (default is 0.001). WARNING: the returned x values may be data dependent, with two measured sets having different x after stitching, even though the measurement conditions are identical!! Either add an intensity weight to the datasets:: probe.I = slitscan or use interpolation if you need to align two stitched scans:: import numpy as np x1, dx1, y1, dy1 = stitch([a1, b1, c1, d1]) x2, dx2, y2, dy2 = stitch([a2, b2, c2, d2]) x2[0], x2[-1] = x1[0], x1[-1] # Force matching end points y2 = np.interp(x1, x2, y2) dy2 = np.interp(x1, x2, dy2) x2 = x1 WARNING: the returned dx value underestimates the true x, depending on the relative weights of the averaged data points. """ if same_dx is None: same_dx = same_x x = hstack(p.x for p in data) dx = hstack(p.dx for p in data) y = hstack(p.y for p in data) dy = hstack(p.dy for p in data) if all(hasattr(p, "I") for p in data): weight = hstack(p.I for p in data) else: weight = y / dy**2 # y/dy**2 is approximately the intensity # Sort the data by increasing x idx = argsort(x) data = vstack((x, dx, y, dy, weight)) data = data[:, idx] x = data[0, :] dx = data[1, :] # Skip through the data looking for regions of overlap. keep = [] n, last, next = len(x), 0, 1 while next < n: while next < n and abs(x[next] - x[last]) <= same_x: next += 1 if next - last == 1: # Only one point, so no averaging necessary keep.append(last) else: # Pick the x in [last:next] with the best resolution and average # them using Poisson averages. Repeat until all points are used remainder = data[:, last:next] avg = [] while remainder.shape[1] > 0: best_dx = min(remainder[1, :]) idx = remainder[1, :] - best_dx <= same_dx avg.append(poisson_average(remainder[:, idx])) remainder = remainder[:, ~idx] # Store the result in worst to best resolution order. for i, d in enumerate(reversed(avg)): data[:, last + i] = d keep.append(last + i) last = next return data[:4, keep]
[docs] def poisson_average(xdxydyw): """ Compute the poisson average of y/dy using a set of data points. The returned x, dx is the weighted average of the inputs:: x = sum(x*I)/sum(I) dx = sum(dx*I)/sum(I) The returned y, dy use Poisson averaging:: w = sum(y/dy^2) y = sum((y/dy)^2)/w dy = sqrt(y/w) The above formula gives the expected result for combining two measurements, assuming there is no uncertainty in the monitor. :: measure N counts during M monitors rate: r = N/M rate uncertainty: dr = sqrt(N)/M weighted rate: r/dr^2 = (N/M) / (N/M^2) = M weighted rate squared: r^2/dr^2 = (N^2/M^2) / (N/M^2) = N for two measurements Na, Nb w = ra/dra^2 + rb/drb^2 = Ma + Mb y = ((ra/dra)^2 + (rb/drb)^2)/w = (Na + Nb)/(Ma + Mb) dy = sqrt(y/w) = sqrt( (Na + Nb)/ w^2 ) = sqrt(Na+Nb)/(Ma + Mb) This formula isn't strictly correct when applied to values which have been scaled, for example to account for an attenuator in the counting system. """ # TODO: need better estimate of dx, with weighted broadening according # to the distance of the x's from the centers. # TODO: check the accuracy of the formula in the presence of # attenuators. x, dx, y, _dy, weight = xdxydyw w = sum(weight) x = sum(x * weight) / w dx = sum(dx * weight) / w y = sum(y * weight) / w dy = sqrt(y / w) # print "averaging", xdxydy, x, dx, y, dy return x, dx, y, dy, w