题目链接

任务批处理负载均衡

题目描述

一个云计算中心有 个连续的微任务,每个任务有计算成本 。需要将这 个任务按顺序划分成 个批次。目标是找到一种划分方案,使得这 个批次的总计算成本(批次内所有任务成本之和)的标准差最小。

输入:

  • 第一行包含两个整数
  • 第二行包含 个正整数,代表每个任务的计算成本。

输出:

  • 输出一行 个整数,代表最优划分方案中,每个批次包含的任务数量。

解题思路

本题的核心是找到一种任务划分方式,使得批次成本的标准差最小。

首先,我们来分析一下目标函数:标准差。设 个批次的成本和分别为 。 这 个值的平均值为 。 由于所有任务都必须被处理,批次成本的总和是一个定值,等于所有任务成本的总和,我们记为 。因此,平均值 也是一个定值。

方差是标准差的平方,其计算公式为 。 最小化标准差等价于最小化方差。 展开方差公式: 代入 由于 都是定值,所以要最小化方差 ,我们只需要最小化批次成本的平方和 即可。

问题转化为了:将一个序列划分成 个连续子段,使得所有子段和的平方和最小。这是一个典型的动态规划问题。

我们可以定义一个二维 DP 数组: 表示将前 个任务划分成 个批次所能得到的最小的“批次成本平方和”。

为了得到 ,我们可以考虑第 个批次(也就是最后一个批次)包含了哪些任务。假设第 个批次从第 个任务开始,一直到第 个任务。那么前 个任务就需要被划分为 个批次。 这引出了状态转移方程:

为了快速计算 ,我们可以预处理一个前缀和数组

同时,为了最终能输出每个批次包含的任务数,我们还需要一个 数组来记录下使得 取到最小值的那个分割点 。当 DP 表格填充完毕后,我们可以通过从 开始回溯,找出最优划分方案。

算法步骤:

  1. 计算任务成本的前缀和数组。
  2. 初始化 数组为一个极大值,
  3. 使用三层循环填充 数组。
  4. 根据 数组从后向前回溯,得到每个批次的大小。
  5. 输出结果。

代码

#include <iostream>
#include <vector>
#include <numeric>
#include <algorithm>

using namespace std;

int main() {
    int n, m;
    cin >> n >> m;
    vector<long long> costs(n);
    for (int i = 0; i < n; ++i) {
        cin >> costs[i];
    }

    // 预处理前缀和
    vector<long long> prefix_sum(n + 1, 0);
    for (int i = 0; i < n; ++i) {
        prefix_sum[i + 1] = prefix_sum[i] + costs[i];
    }

    // dp[i][j]: 将前 i 个任务划分成 j 个批次的最小平方和
    vector<vector<long long>> dp(n + 1, vector<long long>(m + 1, -1));
    // path[i][j]: 记录最优决策点 k
    vector<vector<int>> path(n + 1, vector<int>(m + 1, 0));

    dp[0][0] = 0;

    // j 是批次数
    for (int j = 1; j <= m; ++j) {
        // i 是任务数
        for (int i = j; i <= n; ++i) {
            // k 是上一个批次的结束位置
            for (int k = j - 1; k < i; ++k) {
                if (dp[k][j - 1] == -1) continue;
                long long current_cost = prefix_sum[i] - prefix_sum[k];
                long long current_val = dp[k][j - 1] + current_cost * current_cost;
                if (dp[i][j] == -1 || current_val < dp[i][j]) {
                    dp[i][j] = current_val;
                    path[i][j] = k;
                }
            }
        }
    }

    // 回溯路径,得到每个批次的大小
    vector<int> result;
    int current_n = n;
    for (int j = m; j > 0; --j) {
        int prev_n = path[current_n][j];
        result.push_back(current_n - prev_n);
        current_n = prev_n;
    }
    reverse(result.begin(), result.end());

    for (int i = 0; i < result.size(); ++i) {
        cout << result[i] << (i == result.size() - 1 ? "" : " ");
    }
    cout << "\n";

    return 0;
}
import java.util.Scanner;
import java.util.ArrayList;
import java.util.Collections;

public class Main {
    public static void main(String[] args) {
        Scanner sc = new Scanner(System.in);
        int n = sc.nextInt();
        int m = sc.nextInt();
        long[] costs = new long[n];
        for (int i = 0; i < n; i++) {
            costs[i] = sc.nextLong();
        }

        // 预处理前缀和
        long[] prefixSum = new long[n + 1];
        for (int i = 0; i < n; i++) {
            prefixSum[i + 1] = prefixSum[i] + costs[i];
        }

        // dp[i][j]: 将前 i 个任务划分成 j 个批次的最小平方和
        long[][] dp = new long[n + 1][m + 1];
        // path[i][j]: 记录最优决策点 k
        int[][] path = new int[n + 1][m + 1];

        for (int i = 0; i <= n; i++) {
            for (int j = 0; j <= m; j++) {
                dp[i][j] = -1;
            }
        }
        dp[0][0] = 0;

        // j 是批次数
        for (int j = 1; j <= m; j++) {
            // i 是任务数
            for (int i = j; i <= n; i++) {
                // k 是上一个批次的结束位置
                for (int k = j - 1; k < i; k++) {
                    if (dp[k][j - 1] == -1) continue;
                    long currentCost = prefixSum[i] - prefixSum[k];
                    long currentVal = dp[k][j - 1] + currentCost * currentCost;
                    if (dp[i][j] == -1 || currentVal < dp[i][j]) {
                        dp[i][j] = currentVal;
                        path[i][j] = k;
                    }
                }
            }
        }

        // 回溯路径,得到每个批次的大小
        ArrayList<Integer> result = new ArrayList<>();
        int currentN = n;
        for (int j = m; j > 0; j--) {
            int prevN = path[currentN][j];
            result.add(currentN - prevN);
            currentN = prevN;
        }
        Collections.reverse(result);

        for (int i = 0; i < result.size(); i++) {
            System.out.print(result.get(i) + (i == result.size() - 1 ? "" : " "));
        }
        System.out.println();
    }
}
import sys

# 读取输入
n, m = map(int, input().split())
costs = list(map(int, input().split()))

# 预处理前缀和
prefix_sum = [0] * (n + 1)
for i in range(n):
    prefix_sum[i + 1] = prefix_sum[i] + costs[i]

# dp[i][j]: 将前 i 个任务划分成 j 个批次的最小平方和
# 初始化为极大值
dp = [[-1] * (m + 1) for _ in range(n + 1)]
# path[i][j]: 记录最优决策点 k
path = [[0] * (m + 1) for _ in range(n + 1)]

dp[0][0] = 0

# j 是批次数
for j in range(1, m + 1):
    # i 是任务数
    for i in range(j, n + 1):
        # k 是上一个批次的结束位置
        for k in range(j - 1, i):
            if dp[k][j - 1] == -1:
                continue
            current_cost = prefix_sum[i] - prefix_sum[k]
            current_val = dp[k][j - 1] + current_cost * current_cost
            if dp[i][j] == -1 or current_val < dp[i][j]:
                dp[i][j] = current_val
                path[i][j] = k

# 回溯路径,得到每个批次的大小
result = []
current_n = n
for j in range(m, 0, -1):
    prev_n = path[current_n][j]
    result.append(current_n - prev_n)
    current_n = prev_n

result.reverse()
print(*result)

算法及复杂度

  • 算法:动态规划
  • 时间复杂度:,其中 是任务总数, 是批次数。DP 状态的计算需要三层循环。
  • 空间复杂度:,用于存储 表和 表。