内容分页
分页功能允许将大量数据拆分为可管理的部分。
概述
Rapid 内容 API 允许使用大量住宿数据。这些数据量很大,因此内容 API 支持分页以将数据拆分为可管理的部分。本文档提供了使用分页功能的一些示例和最佳做法。
基本示例
分页流程从搜索住宿开始,获得了超出单个页面容纳范围的结果。发生这种情况时,响应将包含第一页结果,然后是 Link
响应标头,可以通过该标头进入下一页。
示例请求:
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia
响应标头示例:
Link: <https://api.ean.com/v3/properties/content?token=WVZTCUNRUQ4SUSNHAXIWFk0VRQ5JZhFWExRaXAgRVnpDA1RWTUkBB10FHFYGQwZyHwBNFA1HBhIMC1IGAUsGBhkHBGcHBBUGdlMHQAd1UA8WBwwMB1NcBAhdahBWUAdRXjtfVwpEBiEdASBHREpdEVwUQRxuRVgRWg1UaVkHS1kcA3IWBXEVAFZMAz1VBVRWXT5KRQNKFQVEACMXASJFVlRBVzoTRQZVQQRVOUdHVUAVDRRXIBNXJxdYAwtWQFJeVgpHAiYTCwoEWhRmZ0MHCxwFJhNbUEcGU1tCHW1dAWwAGlEIEAFVXEYNIRQBIRcTSltIAUVHTTxdAghAU3VDDSFCVkYCXFE8XgIMQwF7QAAlFwVZEVJpTUdWBBcHU2cBXgEKRgFwFVdxR1tWQQtHQhhuUFgAAA5WE1oKH0JcBEZVDGdVBBdVQl4BVQgFVRIVEFwWBBdHS2xKBU1RDANvDFFfX0cNekZTcxJeE1gQW24XDw8RDEdTIUBTJhFTAxZXb1lUAVNRa1ZZAFxHAXQVUHxDVxdDUAxcFRVmVFpQBlRbFFNxEAwgRXcMXAdfFUZbBFQAXFQGV1YCAVI=>; rel="next"; expires=2023-06-01T17:13:19.699379618Z
点击过期时间之前提供的链接,将返回下一页结果以及之后页面的新 Link
标头。要浏览整个响应,只需继续点击返回的每个 Link
标头,直到不再返回 Link
标头为止。这表示所请求的数据集结束。
筛选请求的数据
虽然上面的简单示例显示了分页的工作原理,但搜索量也非常大。当有很多住宿时,可能需要一些时间才能对所有住宿进行分页。通过包含额外的查询参数来仅搜索实际需要的住宿会很有帮助。
例如,可能只需要请求美国的住宿,而不是所有住宿。可以通过使用 country_code
对象更改请求以包含查询参数来请求此住宿子集。
包含国家/地区参数的示例请求:
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US
这仍将提供与上面相同的分页功能,但需要分页的住宿会更少。
减少请求的住宿数量的另一种方法是:仅获取自上次拉取住宿数据以来发生变更的住宿。使用 date_updated_start
对象仅返回自给定日期以来发生变更的住宿。
包含国家/地区和日期参数的示例请求:
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&date_updated_start=2023-01-02
确保仅请求所需的住宿是提高分页速度和减少传输数据量的关键。
拆分搜索以实现并行化
有时,即使只请求所需的住宿,结果量仍然很大。在这种情况下,并行执行多个搜索有助于加快搜索流程。
第一步是将所需的搜索拆分为更小的搜索。对于每个用例,这将有所不同,但可以通过从所需的搜索开始,然后向该搜索添加更多彼此不重叠的查询参数来实现。
例如,如果所需搜索是位于美国的所有住宿,则首先按国家/地区进行筛选,如上例所示。
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US
然后可以使用 property_rating_min
和 property_rating_max
对象进一步拆分此搜索。
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=0.0&property_rating_max=0.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=1.0&property_rating_max=1.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=2.0&property_rating_max=2.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=3.0&property_rating_max=3.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=4.0&property_rating_max=4.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=5.0
现在有六个单独的请求,它们都可以独立且并行地进行分页。结果是检索到相同的一组数据,但速度更快。
每种情况都会有所不同,但从所需的搜索开始,查看响应第一页上的 pagination-total-results
响应标头,这将提供一个指标,表明拆分搜索是否有帮助。
代码示例
虽然上述信息从概念上概述了分页过程以及如何拆分数据,但下面是一些 Java 代码,可以提供更具体的示例。
**注意:**下面的代码示例中不包括正确的异常处理和其他最佳实践。和往常一样,在编写可用于生产的代码时,仍应遵循所有最佳实践。
首先,可以使用一个简单的 RapidClient
类作为进行 Rapid 调用的基础。
public class RapidClient {
// Base URL
private static final String RAPID_BASE_URL = "https://api.ean.com";
// Headers
private static final String GZIP = "gzip";
private static final String AUTHORIZATION_HEADER = "EAN APIKey={0},Signature={1},timestamp={2}";
// HTTP Client
private static final Client CLIENT = ClientBuilder.newClient().register(GZipEncoder.class);
private final String apiKey;
private final String sharedSecret;
public RapidClient(String apikey, String sharedSecret) {
this.apiKey = apikey;
this.sharedSecret = sharedSecret;
}
public Response get(String path, MultivaluedMap<String, String> queryParameters) {
WebTarget webTarget = CLIENT.target(RAPID_BASE_URL).path(path);
// Add all query parameters from the map to the web target
for (Map.Entry<String, List<String>> entry : queryParameters.entrySet()) {
for (String value : entry.getValue()) {
webTarget = webTarget.queryParam(entry.getKey(), value);
}
}
return webTarget.request(MediaType.APPLICATION_JSON_TYPE)
.header(HttpHeaders.ACCEPT_ENCODING, GZIP)
.header(HttpHeaders.AUTHORIZATION, generateAuthHeader())
.get();
}
private String generateAuthHeader() {
final String timeStampInSeconds = String.valueOf(ZonedDateTime.now(ZoneOffset.UTC).toEpochSecond());
final String input = apiKey + sharedSecret + timeStampInSeconds;
final String signature = DigestUtils.sha512Hex(input);
return MessageFormat.format(AUTHORIZATION_HEADER, apiKey, signature, timeStampInSeconds);
}
}
这只是一些样板代码,可以让您更轻松地阅读接下来的类。
接下来的类将表示特定的内容 API 调用,并将使用 RapidClient
进行调用。
public class PropertyContentCall {
// Path
private static final String PROPERTY_CONTENT_PATH = "v3/properties/content";
// Headers
private static final String LINK = "Link";
private static final String PAGINATION_TOTAL_RESULTS = "Pagination-Total-Results";
// Query parameters keys
private static final String LANGUAGE = "language";
private static final String SUPPLY_SOURCE = "supply_source";
private static final String COUNTRY_CODE = "country_code";
private static final String CATEGORY_ID = "category_id";
private static final String TOKEN = "token";
private static final String INCLUDE = "include";
// Call parameters
private final RapidClient client;
private final String language;
private final String supplySource;
private final List<String> countryCodes;
private final List<String> categoryIds;
private String token;
public PropertyContentCall(RapidClient client, String language, String supplySource,
List<String> countryCodes, List<String> categoryIds) {
this.client = client;
this.language = language;
this.supplySource = supplySource;
this.countryCodes = countryCodes;
this.categoryIds = categoryIds;
}
public Stream<RapidPropertyContent> stream() {
return Stream.generate(() -> {
synchronized (this) {
// Make the call to Rapid.
final Response response = client.get(PROPERTY_CONTENT_PATH, queryParameters());
// Read the response to return.
final Map<String, RapidPropertyContent> propertyContents = response.readEntity(new GenericType<>() { });
// Store the token for pagination if we got one.
token = getTokenFromLink(response.getHeaderString(LINK));
return propertyContents;
}
})
.takeWhile(MapUtils::isNotEmpty)
.map(Map::values)
.flatMap(Collection::stream);
}
public Integer size() {
// Make the call to Rapid.
final MultivaluedMap<String, String> queryParameters = queryParameters();
queryParameters.putSingle(INCLUDE, "property_ids");
final Response response = client.get(PROPERTY_CONTENT_PATH, queryParameters);
// Read the size to return.
final Integer size = Integer.parseInt(response.getHeaderString(PAGINATION_TOTAL_RESULTS));
// Close the response since we're not reading it.
response.close();
return size;
}
private MultivaluedMap<String, String> queryParameters() {
final MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();
if (token != null) {
queryParams.putSingle(TOKEN, token);
} else {
// Add required parameters
queryParams.putSingle(LANGUAGE, language);
queryParams.putSingle(SUPPLY_SOURCE, supplySource);
// Add optional parameters
if (CollectionUtils.isNotEmpty(countryCodes)) {
queryParams.put(COUNTRY_CODE, countryCodes);
}
if (CollectionUtils.isNotEmpty(categoryIds)) {
queryParams.put(CATEGORY_ID, categoryIds);
}
}
return queryParams;
}
private String getTokenFromLink(String linkHeader) {
if (StringUtils.isEmpty(linkHeader)) {
return null;
}
final int startOfToken = linkHeader.indexOf("=") + 1;
final int endOfToken = linkHeader.indexOf(">");
return linkHeader.substring(startOfToken, endOfToken);
}
}
PropertyContentCall
表示对 Rapid 内容 API 的单个请求,并封装通过该调用完成的分页过程。
示例:
将下面的 API 调用与等效的 Java 请求进行比较。
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US
PropertyContentCall request = new PropertyContentCall(myRapidClient, "en-US", "expedia", List.of("US"), null);
- 这里使用的
PropertyContentCall
是特定于本示例的。调用将按country_code
和category_id
细分,但可以根据用例进行更改。由于这是专门为并行化编写的,因此本示例将使用 Java 并行流。使用公共stream()
方法是为了返回RapidPropertyContent
对象流。RapidPropertyContent
对象只是一个 POJO,表示来自 Rapid 内容 API 调用的单个住宿。虽然这里使用的是 Java 并行流,但任何并行运行代码的方式都足够了。 - 当调用
stream()
的代码需要从流中读取另一个住宿时,此方法将提供该住宿(如果已检索到该住宿),或者将调用 Rapid 内容 API 获取下一页结果并从中返回一个住宿。只需调用stream()
并将其读取完成即可处理通过请求返回的每个住宿的分页。 - 还有另一个公共帮助程序方法
size()
,它提供了一种方法,可以方便地查看此PropertyContentCall
将返回的住宿的总数。这有助于确定调用是否已经足够小,或者是否需要进一步拆分为更小的调用以进行并行化。
上述构建块为调用 Rapid 和通过响应进行分页提供了基础。下面的代码利用上述类自动将调用拆分为可管理的部分,并行分页浏览所有较小的调用,并将组合的输出写入文件。
public class ParallelFileMaker {
private static final String APIKEY = System.getenv().get("RAPID_APIKEY");
private static final String SHARED_SECRET = System.getenv().get("RAPID_SHARED_SECRET");
private static final List<String> COUNTRIES = Arrays.asList("AD", "AE", "AF", "AG", "AI", "AL", "AM", "AO", "AQ",
"AR", "AS", "AT", "AU", "AW", "AX", "AZ", "BA", "BB", "BD", "BE", "BF", "BG", "BH", "BI", "BJ", "BL", "BM",
"BN", "BO", "BQ", "BR", "BS", "BT", "BV", "BW", "BY", "BZ", "CA", "CC", "CD", "CF", "CG", "CH", "CI", "CK",
"CL", "CM", "CN", "CO", "CR", "CU", "CV", "CW", "CX", "CY", "CZ", "DE", "DJ", "DK", "DM", "DO", "DZ", "EC",
"EE", "EG", "EH", "ER", "ES", "ET", "FI", "FJ", "FK", "FM", "FO", "FR", "GA", "GB", "GD", "GE", "GF", "GG",
"GH", "GI", "GL", "GM", "GN", "GP", "GQ", "GR", "GS", "GT", "GU", "GW", "GY", "HK", "HM", "HN", "HR", "HT",
"HU", "ID", "IE", "IL", "IM", "IN", "IO", "IQ", "IR", "IS", "IT", "JE", "JM", "JO", "JP", "KE", "KG", "KH",
"KI", "KM", "KN", "KP", "KR", "KW", "KY", "KZ", "LA", "LB", "LC", "LI", "LK", "LR", "LS", "LT", "LU", "LV",
"LY", "MA", "MC", "MD", "ME", "MF", "MG", "MH", "MK", "ML", "MM", "MN", "MO", "MP", "MQ", "MR", "MS", "MT",
"MU", "MV", "MW", "MX", "MY", "MZ", "NA", "NC", "NE", "NF", "NG", "NI", "NL", "NO", "NP", "NR", "NU", "NZ",
"OM", "PA", "PE", "PF", "PG", "PH", "PK", "PL", "PM", "PN", "PR", "PS", "PT", "PW", "PY", "QA", "RE", "RO",
"RS", "RU", "RW", "SA", "SB", "SC", "SD", "SE", "SG", "SH", "SI", "SJ", "SK", "SL", "SM", "SN", "SO", "SR",
"SS", "ST", "SV", "SX", "SY", "SZ", "TC", "TD", "TF", "TG", "TH", "TJ", "TK", "TL", "TM", "TN", "TO", "TR",
"TT", "TV", "TW", "TZ", "UA", "UG", "UM", "US", "UY", "UZ", "VA", "VC", "VE", "VG", "VI", "VN", "VU", "WF",
"WS", "YE", "YT", "ZA", "ZM", "ZW");
private static final List<String> PROPERTY_CATEGORIES = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8",
"9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "20", "21", "22", "23", "24", "25", "26",
"29", "30", "31", "32", "33", "34", "36", "37", "39", "40", "41", "42", "43", "44");
private static final int MAX_CALL_SIZE = 20_000;
private static final String LANGUAGE = "en-US";
private static final String SUPPLY_SOURCE = "expedia";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModule(new JavaTimeModule());
private static final RapidClient RAPID_CLIENT = new RapidClient(APIKEY, SHARED_SECRET);
public void run() throws IOException {
final Map<PropertyContentCall, Integer> allCalls = divideUpCalls();
// Make sure we're making the calls in the most efficient order. This list will be smallest to largest, so
// that when the streams get combined and are reversed, the largest stream will be first.
final List<Stream<RapidPropertyContent>> callsToMake = allCalls.entrySet().stream()
.filter(entry -> entry.getValue() > 0) // filter out any calls that don't have results
.sorted(Map.Entry.comparingByValue()) // sort all the calls with the smallest calls first
.map(Map.Entry::getKey) // just need the call itself now
.map(PropertyContentCall::stream) // get the stream for each call
.toList();
// Combine all the streams into one big stream and actually make the calls and write to the file.
try (Stream<RapidPropertyContent> bigStream = combineStreams(callsToMake);
BufferedWriter outputFileWriter = createFileWriter(Path.of("output.jsonl.gz"))) {
bigStream.parallel()
.forEach(property -> {
try {
// Write to output file
synchronized (outputFileWriter) {
outputFileWriter.append(OBJECT_MAPPER.writeValueAsString(property));
outputFileWriter.newLine();
}
} catch (Exception e) {
// Handle exception
}
});
}
}
/**
* This will split up the calls to be made based on the size of each call's results. It will first split into
* calls per country and, if needed, it will then further split into calls per category for any country that is
* too big on its own.
* The size of each call is also kept so that the calls can be further sorted if needed.
*
* @return A map containing all the calls and their respective sizes.
*/
private Map<PropertyContentCall, Integer> divideUpCalls() {
final Map<PropertyContentCall, Integer> allCalls = new HashMap<>();
COUNTRIES.stream().parallel()
.forEach(countryCode -> {
// Check to see if the entire country is small enough to get at once.
final PropertyContentCall countryCall = new PropertyContentCall(RAPID_CLIENT, LANGUAGE,
SUPPLY_SOURCE, List.of(countryCode), null);
final Integer countryCallSize = countryCall.size();
if (countryCallSize < MAX_CALL_SIZE) {
// It's small enough! No need to break this call up further.
allCalls.put(countryCall, countryCallSize);
} else {
// The country is too big, need to break up the call into smaller parts.
PROPERTY_CATEGORIES.stream().parallel()
.forEach(category -> {
final PropertyContentCall categoryCall = new PropertyContentCall(RAPID_CLIENT,
LANGUAGE, SUPPLY_SOURCE, List.of(countryCode), List.of(category));
allCalls.put(categoryCall, categoryCall.size());
});
}
});
return allCalls;
}
/**
* This will combine multiple Streams into a single Stream. Because of how this is reduced, the Streams will end
* up in the reverse order of the list that was passed in.
* <p>
* Note: Because this is concatenating multiple Streams together, each Stream will go on the stack. Thus, if
* there are many Streams then a StackOverflowException can occur when trying to use the combined Stream. Make
* sure the stack size is appropriate for your usage via the `-Xss` JVM parameter.
*
* @param streams A list of the Streams to combine.
* @return The combined Stream that can be treated as one.
*/
private <T> Stream<T> combineStreams(List<Stream<T>> streams) {
return streams.stream()
.filter(Objects::nonNull)
.reduce(Stream::concat)
.orElse(Stream.empty());
}
private BufferedWriter createFileWriter(Path path) throws IOException {
return new BufferedWriter(
new OutputStreamWriter(
new GZIPOutputStream(
Files.newOutputStream(path)),
StandardCharsets.UTF_8));
}
}
虽然上面的代码有许多内联注释来解释各个部分,但可以通过以下方式总结:
- 根据用例将主调用拆分为较小的调用。(在此示例中,主调用是获取所有内容,按
country_code
进行拆分,如果需要,还可以按category_id
进行拆分)。 - 特定于此示例组合并行流的方式,对调用进行排序以更有效地运行。
- 然后并行运行这些调用,并将这些调用返回的住宿写入文件。