RAG系统最大收益

题目分析

RAG 系统需要在 个连续自然日内规划操作,使净收益最大化。知识库初始为"过期"状态,在第 天执行更新后,知识库在 天内有效。每天最多执行一项操作:更新并查询(付出 ,获得 )、仅查询(知识库有效时获得 )、或什么都不做。

思路

DP + 单调队列优化

核心观察:知识库的有效性完全取决于最近一次更新的时间。因此可以围绕"更新日"来设计 DP。

表示在第 天执行更新时,从第 天到第 天能获得的最大净收益(包含第 天的更新成本和查询收益)。

基础情况: 天是第一次更新(之前没有任何更新),则

状态转移: 假设上一次更新在第 天,则从第 天到第 天中,知识库有效的天数可以免费查询。根据 的距离分两种情况:

  • 情况 1(第 天的更新覆盖到第 天),所有中间天都能免费查询。转移为:

$$

。需要在滑动窗口 内最大化 ,用单调队列维护。

  • 情况 2(第 天的更新在第 天之前已过期),只有 天可以免费查询。转移为:

$$

其中 只依赖 ,维护一个前缀最大值即可。

最终答案: 遍历所有可能的最后一次更新日 ,加上更新后剩余有效天的免费查询收益,即:

$$

是因为可以选择什么都不做。

以样例验证:

  • ,更新后有效到第 天,免费查询第 天收益 ,总计
  • :从 转移(情况 1,),,更新后有效到第 天,免费查询第 天收益 ,总计
  • 最优策略:更新第 天(),更新第 天(),免费查询第 天(),总计

复杂度

  • 时间复杂度:,每个元素最多入队出队一次
  • 空间复杂度:,存储 DP 数组、前缀和数组和单调队列

代码

import sys
from collections import deque

def main():
    input_data = sys.stdin.buffer.read().split()
    idx = 0
    n = int(input_data[idx]); idx += 1
    d = int(input_data[idx]); idx += 1
    update_cost = [int(input_data[idx + i]) for i in range(n)]; idx += n
    query_reward = [int(input_data[idx + i]) for i in range(n)]; idx += n

    # prefix[i] = query_reward[0] + ... + query_reward[i-1]
    prefix = [0] * (n + 1)
    for i in range(n):
        prefix[i + 1] = prefix[i] + query_reward[i]

    # dp[i] = 在第 i 天更新时,第 0~i 天的最大净收益
    dp = [0] * n
    for i in range(n):
        dp[i] = query_reward[i] - update_cost[i]

    # 单调队列维护情况 1 的滑动窗口最大值
    dq = deque()
    # 前缀最大值维护情况 2
    best_case2 = float('-inf')

    for i in range(n):
        if i >= 1:
            j = i - 1
            val = dp[j] - prefix[j + 1]
            while dq and dp[dq[-1]] - prefix[dq[-1] + 1] <= val:
                dq.pop()
            dq.append(j)

        if i - d - 1 >= 0:
            j2 = i - d - 1
            v2 = dp[j2] + prefix[j2 + d] - prefix[j2 + 1]
            if v2 > best_case2:
                best_case2 = v2

        while dq and dq[0] < i - d:
            dq.popleft()

        best = dp[i]

        if dq:
            j_best = dq[0]
            val = dp[j_best] - prefix[j_best + 1] + prefix[i] + query_reward[i] - update_cost[i]
            if val > best:
                best = val

        if best_case2 > float('-inf'):
            val = best_case2 + query_reward[i] - update_cost[i]
            if val > best:
                best = val

        dp[i] = best

    ans = 0
    for i in range(n):
        end = min(i + d, n)
        total = dp[i] + prefix[end] - prefix[i + 1]
        if total > ans:
            ans = total

    print(ans)

main()