内容分页

分页功能允许将大量数据拆分为可管理的部分。

概述

Rapid 内容 API 允许使用大量住宿数据。这些数据量很大,因此内容 API 支持分页以将数据拆分为可管理的部分。本文档提供了使用分页功能的一些示例和最佳做法。

基本示例

分页流程从搜索住宿开始,获得了超出单个页面容纳范围的结果。发生这种情况时,响应将包含第一页结果,然后是 Link 响应标头,可以通过该标头进入下一页。

示例请求:

https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia

响应标头示例:

Link: <https://api.ean.com/v3/properties/content?token=WVZTCUNRUQ4SUSNHAXIWFk0VRQ5JZhFWExRaXAgRVnpDA1RWTUkBB10FHFYGQwZyHwBNFA1HBhIMC1IGAUsGBhkHBGcHBBUGdlMHQAd1UA8WBwwMB1NcBAhdahBWUAdRXjtfVwpEBiEdASBHREpdEVwUQRxuRVgRWg1UaVkHS1kcA3IWBXEVAFZMAz1VBVRWXT5KRQNKFQVEACMXASJFVlRBVzoTRQZVQQRVOUdHVUAVDRRXIBNXJxdYAwtWQFJeVgpHAiYTCwoEWhRmZ0MHCxwFJhNbUEcGU1tCHW1dAWwAGlEIEAFVXEYNIRQBIRcTSltIAUVHTTxdAghAU3VDDSFCVkYCXFE8XgIMQwF7QAAlFwVZEVJpTUdWBBcHU2cBXgEKRgFwFVdxR1tWQQtHQhhuUFgAAA5WE1oKH0JcBEZVDGdVBBdVQl4BVQgFVRIVEFwWBBdHS2xKBU1RDANvDFFfX0cNekZTcxJeE1gQW24XDw8RDEdTIUBTJhFTAxZXb1lUAVNRa1ZZAFxHAXQVUHxDVxdDUAxcFRVmVFpQBlRbFFNxEAwgRXcMXAdfFUZbBFQAXFQGV1YCAVI=>; rel="next"; expires=2023-06-01T17:13:19.699379618Z

点击过期时间之前提供的链接,将返回下一页结果以及之后页面的新 Link 标头。要浏览整个响应,只需继续点击返回的每个 Link 标头,直到不再返回 Link 标头为止。这表示所请求的数据集结束。

筛选请求的数据

虽然上面的简单示例显示了分页的工作原理,但搜索量也非常大。当有很多住宿时,可能需要一些时间才能对所有住宿进行分页。通过包含额外的查询参数来仅搜索实际需要的住宿会很有帮助。

例如,可能只需要请求美国的住宿,而不是所有住宿。可以通过使用 country_code 对象更改请求以包含查询参数来请求此住宿子集。

包含国家/地区参数的示例请求:

https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US

这仍将提供与上面相同的分页功能,但需要分页的住宿会更少。

减少请求的住宿数量的另一种方法是:仅获取自上次拉取住宿数据以来发生变更的住宿。使用 date_updated_start 对象仅返回自给定日期以来发生变更的住宿。

包含国家/地区和日期参数的示例请求:

https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&date_updated_start=2023-01-02

确保仅请求所需的住宿是提高分页速度和减少传输数据量的关键。

拆分搜索以实现并行化

有时,即使只请求所需的住宿,结果量仍然很大。在这种情况下,并行执行多个搜索有助于加快搜索流程。

第一步是将所需的搜索拆分为更小的搜索。对于每个用例,这将有所不同,但可以通过从所需的搜索开始,然后向该搜索添加更多彼此不重叠的查询参数来实现。

例如,如果所需搜索是位于美国的所有住宿,则首先按国家/地区进行筛选,如上例所示。

https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US

然后可以使用 property_rating_minproperty_rating_max 对象进一步拆分此搜索。

https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=0.0&property_rating_max=0.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=1.0&property_rating_max=1.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=2.0&property_rating_max=2.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=3.0&property_rating_max=3.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=4.0&property_rating_max=4.9
https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US&property_rating_min=5.0

现在有六个单独的请求,它们都可以独立且并行地进行分页。结果是检索到相同的一组数据,但速度更快。

每种情况都会有所不同,但从所需的搜索开始,查看响应第一页上的 pagination-total-results 响应标头,这将提供一个指标,表明拆分搜索是否有帮助。

代码示例

虽然上述信息从概念上概述了分页过程以及如何拆分数据,但下面是一些 Java 代码,可以提供更具体的示例。

**注意:**下面的代码示例中不包括正确的异常处理和其他最佳实践。和往常一样,在编写可用于生产的代码时,仍应遵循所有最佳实践。

首先,可以使用一个简单的 RapidClient 类作为进行 Rapid 调用的基础。

public class RapidClient {
    // Base URL
    private static final String RAPID_BASE_URL = "https://api.ean.com";

    // Headers
    private static final String GZIP = "gzip";
    private static final String AUTHORIZATION_HEADER = "EAN APIKey={0},Signature={1},timestamp={2}";

    // HTTP Client
    private static final Client CLIENT = ClientBuilder.newClient().register(GZipEncoder.class);

    private final String apiKey;
    private final String sharedSecret;

    public RapidClient(String apikey, String sharedSecret) {
        this.apiKey = apikey;
        this.sharedSecret = sharedSecret;
    }

    public Response get(String path, MultivaluedMap<String, String> queryParameters) {
        WebTarget webTarget = CLIENT.target(RAPID_BASE_URL).path(path);

        // Add all query parameters from the map to the web target
        for (Map.Entry<String, List<String>> entry : queryParameters.entrySet()) {
            for (String value : entry.getValue()) {
                webTarget = webTarget.queryParam(entry.getKey(), value);
            }
        }

        return webTarget.request(MediaType.APPLICATION_JSON_TYPE)
                .header(HttpHeaders.ACCEPT_ENCODING, GZIP)
                .header(HttpHeaders.AUTHORIZATION, generateAuthHeader())
                .get();
    }

    private String generateAuthHeader() {
        final String timeStampInSeconds = String.valueOf(ZonedDateTime.now(ZoneOffset.UTC).toEpochSecond());
        final String input = apiKey + sharedSecret + timeStampInSeconds;
        final String signature = DigestUtils.sha512Hex(input);

        return MessageFormat.format(AUTHORIZATION_HEADER, apiKey, signature, timeStampInSeconds);
    }
}

这只是一些样板代码,可以让您更轻松地阅读接下来的类。

接下来的类将表示特定的内容 API 调用,并将使用 RapidClient 进行调用。

public class PropertyContentCall {
    // Path
    private static final String PROPERTY_CONTENT_PATH = "v3/properties/content";

    // Headers
    private static final String LINK = "Link";
    private static final String PAGINATION_TOTAL_RESULTS = "Pagination-Total-Results";

    // Query parameters keys
    private static final String LANGUAGE = "language";
    private static final String SUPPLY_SOURCE = "supply_source";
    private static final String COUNTRY_CODE = "country_code";
    private static final String CATEGORY_ID = "category_id";
    private static final String TOKEN = "token";
    private static final String INCLUDE = "include";

    // Call parameters
    private final RapidClient client;
    private final String language;
    private final String supplySource;
    private final List<String> countryCodes;
    private final List<String> categoryIds;

    private String token;

    public PropertyContentCall(RapidClient client, String language, String supplySource,
                               List<String> countryCodes, List<String> categoryIds) {
        this.client = client;
        this.language = language;
        this.supplySource = supplySource;
        this.countryCodes = countryCodes;
        this.categoryIds = categoryIds;
    }

    public Stream<RapidPropertyContent> stream() {
        return Stream.generate(() -> {
                    synchronized (this) {
                        // Make the call to Rapid.
                        final Response response = client.get(PROPERTY_CONTENT_PATH, queryParameters());

                        // Read the response to return.
                        final Map<String, RapidPropertyContent> propertyContents = response.readEntity(new GenericType<>() { });

                        // Store the token for pagination if we got one.
                        token = getTokenFromLink(response.getHeaderString(LINK));

                        return propertyContents;
                    }
                })
                .takeWhile(MapUtils::isNotEmpty)
                .map(Map::values)
                .flatMap(Collection::stream);
    }

    public Integer size() {
        // Make the call to Rapid.
        final MultivaluedMap<String, String> queryParameters = queryParameters();
        queryParameters.putSingle(INCLUDE, "property_ids");
        final Response response = client.get(PROPERTY_CONTENT_PATH, queryParameters);

        // Read the size to return.
        final Integer size = Integer.parseInt(response.getHeaderString(PAGINATION_TOTAL_RESULTS));

        // Close the response since we're not reading it.
        response.close();

        return size;
    }

    private MultivaluedMap<String, String> queryParameters() {
        final MultivaluedMap<String, String> queryParams = new MultivaluedHashMap<>();

        if (token != null) {
            queryParams.putSingle(TOKEN, token);
        } else {
            // Add required parameters
            queryParams.putSingle(LANGUAGE, language);
            queryParams.putSingle(SUPPLY_SOURCE, supplySource);

            // Add optional parameters
            if (CollectionUtils.isNotEmpty(countryCodes)) {
                queryParams.put(COUNTRY_CODE, countryCodes);
            }
            if (CollectionUtils.isNotEmpty(categoryIds)) {
                queryParams.put(CATEGORY_ID, categoryIds);
            }
        }

        return queryParams;
    }

    private String getTokenFromLink(String linkHeader) {
        if (StringUtils.isEmpty(linkHeader)) {
            return null;
        }

        final int startOfToken = linkHeader.indexOf("=") + 1;
        final int endOfToken = linkHeader.indexOf(">");

        return linkHeader.substring(startOfToken, endOfToken);
    }
}

PropertyContentCall 表示对 Rapid 内容 API 的单个请求,并封装通过该调用完成的分页过程。

示例:

将下面的 API 调用与等效的 Java 请求进行比较。

https://api.ean.com/v3/properties/content?language=en-US&supply_source=expedia&country_code=US
PropertyContentCall request = new PropertyContentCall(myRapidClient, "en-US", "expedia", List.of("US"), null);
  • 这里使用的 PropertyContentCall 是特定于本示例的。调用将按 country_codecategory_id细分,但可以根据用例进行更改。由于这是专门为并行化编写的,因此本示例将使用 Java 并行流。使用公共stream() 方法是为了返回 RapidPropertyContent 对象流。 RapidPropertyContent 对象只是一个 POJO,表示来自 Rapid 内容 API 调用的单个住宿。虽然这里使用的是 Java 并行流,但任何并行运行代码的方式都足够了。
  • 当调用 stream() 的代码需要从流中读取另一个住宿时,此方法将提供该住宿(如果已检索到该住宿),或者将调用 Rapid 内容 API 获取下一页结果并从中返回一个住宿。只需调用 stream() 并将其读取完成即可处理通过请求返回的每个住宿的分页。
  • 还有另一个公共帮助程序方法 size(),它提供了一种方法,可以方便地查看此 PropertyContentCall将返回的住宿的总数。这有助于确定调用是否已经足够小,或者是否需要进一步拆分为更小的调用以进行并行化。

上述构建块为调用 Rapid 和通过响应进行分页提供了基础。下面的代码利用上述类自动将调用拆分为可管理的部分,并行分页浏览所有较小的调用,并将组合的输出写入文件。

public class ParallelFileMaker {
    private static final String APIKEY = System.getenv().get("RAPID_APIKEY");
    private static final String SHARED_SECRET = System.getenv().get("RAPID_SHARED_SECRET");
    private static final List<String> COUNTRIES = Arrays.asList("AD", "AE", "AF", "AG", "AI", "AL", "AM", "AO", "AQ",
            "AR", "AS", "AT", "AU", "AW", "AX", "AZ", "BA", "BB", "BD", "BE", "BF", "BG", "BH", "BI", "BJ", "BL", "BM",
            "BN", "BO", "BQ", "BR", "BS", "BT", "BV", "BW", "BY", "BZ", "CA", "CC", "CD", "CF", "CG", "CH", "CI", "CK",
            "CL", "CM", "CN", "CO", "CR", "CU", "CV", "CW", "CX", "CY", "CZ", "DE", "DJ", "DK", "DM", "DO", "DZ", "EC",
            "EE", "EG", "EH", "ER", "ES", "ET", "FI", "FJ", "FK", "FM", "FO", "FR", "GA", "GB", "GD", "GE", "GF", "GG",
            "GH", "GI", "GL", "GM", "GN", "GP", "GQ", "GR", "GS", "GT", "GU", "GW", "GY", "HK", "HM", "HN", "HR", "HT",
            "HU", "ID", "IE", "IL", "IM", "IN", "IO", "IQ", "IR", "IS", "IT", "JE", "JM", "JO", "JP", "KE", "KG", "KH",
            "KI", "KM", "KN", "KP", "KR", "KW", "KY", "KZ", "LA", "LB", "LC", "LI", "LK", "LR", "LS", "LT", "LU", "LV",
            "LY", "MA", "MC", "MD", "ME", "MF", "MG", "MH", "MK", "ML", "MM", "MN", "MO", "MP", "MQ", "MR", "MS", "MT",
            "MU", "MV", "MW", "MX", "MY", "MZ", "NA", "NC", "NE", "NF", "NG", "NI", "NL", "NO", "NP", "NR", "NU", "NZ",
            "OM", "PA", "PE", "PF", "PG", "PH", "PK", "PL", "PM", "PN", "PR", "PS", "PT", "PW", "PY", "QA", "RE", "RO",
            "RS", "RU", "RW", "SA", "SB", "SC", "SD", "SE", "SG", "SH", "SI", "SJ", "SK", "SL", "SM", "SN", "SO", "SR",
            "SS", "ST", "SV", "SX", "SY", "SZ", "TC", "TD", "TF", "TG", "TH", "TJ", "TK", "TL", "TM", "TN", "TO", "TR",
            "TT", "TV", "TW", "TZ", "UA", "UG", "UM", "US", "UY", "UZ", "VA", "VC", "VE", "VG", "VI", "VN", "VU", "WF",
            "WS", "YE", "YT", "ZA", "ZM", "ZW");
    private static final List<String> PROPERTY_CATEGORIES = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8",
            "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "20", "21", "22", "23", "24", "25", "26",
            "29", "30", "31", "32", "33", "34", "36", "37", "39", "40", "41", "42", "43", "44");
    private static final int MAX_CALL_SIZE = 20_000;
    private static final String LANGUAGE = "en-US";
    private static final String SUPPLY_SOURCE = "expedia";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
            .registerModule(new JavaTimeModule());
    private static final RapidClient RAPID_CLIENT = new RapidClient(APIKEY, SHARED_SECRET);

    public void run() throws IOException {
        final Map<PropertyContentCall, Integer> allCalls = divideUpCalls();

        // Make sure we're making the calls in the most efficient order. This list will be smallest to largest, so
        // that when the streams get combined and are reversed, the largest stream will be first.
        final List<Stream<RapidPropertyContent>> callsToMake = allCalls.entrySet().stream()
                .filter(entry -> entry.getValue() > 0) // filter out any calls that don't have results
                .sorted(Map.Entry.comparingByValue()) // sort all the calls with the smallest calls first
                .map(Map.Entry::getKey) // just need the call itself now
                .map(PropertyContentCall::stream) // get the stream for each call
                .toList();

        // Combine all the streams into one big stream and actually make the calls and write to the file.
        try (Stream<RapidPropertyContent> bigStream = combineStreams(callsToMake);
             BufferedWriter outputFileWriter = createFileWriter(Path.of("output.jsonl.gz"))) {
            bigStream.parallel()
                    .forEach(property -> {
                        try {
                            // Write to output file
                            synchronized (outputFileWriter) {
                                outputFileWriter.append(OBJECT_MAPPER.writeValueAsString(property));
                                outputFileWriter.newLine();
                            }
                        } catch (Exception e) {
                            // Handle exception
                        }
                    });
        }
    }

    /**
     * This will split up the calls to be made based on the size of each call's results. It will first split into
     * calls per country and, if needed, it will then further split into calls per category for any country that is
     * too big on its own.
     * The size of each call is also kept so that the calls can be further sorted if needed.
     *
     * @return A map containing all the calls and their respective sizes.
     */
    private Map<PropertyContentCall, Integer> divideUpCalls() {
        final Map<PropertyContentCall, Integer> allCalls = new HashMap<>();
        COUNTRIES.stream().parallel()
                .forEach(countryCode -> {
                    // Check to see if the entire country is small enough to get at once.
                    final PropertyContentCall countryCall = new PropertyContentCall(RAPID_CLIENT, LANGUAGE,
                            SUPPLY_SOURCE, List.of(countryCode), null);
                    final Integer countryCallSize = countryCall.size();

                    if (countryCallSize < MAX_CALL_SIZE) {
                        // It's small enough! No need to break this call up further.
                        allCalls.put(countryCall, countryCallSize);
                    } else {
                        // The country is too big, need to break up the call into smaller parts.
                        PROPERTY_CATEGORIES.stream().parallel()
                                .forEach(category -> {
                                    final PropertyContentCall categoryCall = new PropertyContentCall(RAPID_CLIENT,
                                            LANGUAGE, SUPPLY_SOURCE, List.of(countryCode), List.of(category));

                                    allCalls.put(categoryCall, categoryCall.size());
                                });
                    }
                });

        return allCalls;
    }

    /**
     * This will combine multiple Streams into a single Stream. Because of how this is reduced, the Streams will end
     * up in the reverse order of the list that was passed in.
     * <p>
     * Note: Because this is concatenating multiple Streams together, each Stream will go on the stack. Thus, if
     * there are many Streams then a StackOverflowException can occur when trying to use the combined Stream. Make
     * sure the stack size is appropriate for your usage via the `-Xss` JVM parameter.
     *
     * @param streams A list of the Streams to combine.
     * @return The combined Stream that can be treated as one.
     */
    private <T> Stream<T> combineStreams(List<Stream<T>> streams) {
        return streams.stream()
                .filter(Objects::nonNull)
                .reduce(Stream::concat)
                .orElse(Stream.empty());
    }

    private BufferedWriter createFileWriter(Path path) throws IOException {
        return new BufferedWriter(
                new OutputStreamWriter(
                        new GZIPOutputStream(
                                Files.newOutputStream(path)),
                        StandardCharsets.UTF_8));
    }
}

虽然上面的代码有许多内联注释来解释各个部分,但可以通过以下方式总结:

  1. 根据用例将主调用拆分为较小的调用。(在此示例中,主调用是获取所有内容,按 country_code 进行拆分,如果需要,还可以按 category_id进行拆分)。
  2. 特定于此示例组合并行流的方式,对调用进行排序以更有效地运行。
  3. 然后并行运行这些调用,并将这些调用返回的住宿写入文件。
您觉得这个页面有用吗?
我们该如何改进这些内容?
感谢您帮助我们改进!