设备故障预测程序
题意
给定 条设备日志训练数据,每条包含设备ID、写入次数、读取次数、平均写延迟、平均读延迟、使用年限和设备状态(0正常/1故障)。对数据做清洗后,用逻辑回归模型训练,再对
条测试数据预测故障状态。
数据清洗规则:
- 字符串
"NaN"用该列有效值的均值替换 - 越界值用该列有效值的中位数替换
- 有效值 = 非NaN且在合法范围内的值
- 标签缺失或无法解析的训练样本直接丢弃
模型:带偏置的二元逻辑回归,批量梯度下降,学习率 ,迭代
次,初始权重全
。
思路
这道题本质上不考算法,而是考工程实现的准确性——能不能严格按规则清洗数据、正确实现梯度下降。
把整个流程拆成四步来想:
第一步:收集有效值。 遍历训练集,对每个特征列,找出所有既不是 NaN 又在合法范围内的值。有效值用来算均值和中位数,为后面的清洗做准备。
第二步:数据清洗。 再遍历一遍训练集(和测试集),对每个特征值做判断:
- 解析失败或是 NaN?用均值填充。
- 解析成功但越界?用中位数替换。
- 其他情况保持原值。
同时检查训练集的标签列,解析不了的整行丢掉。
第三步:逻辑回归训练。 这里要注意梯度下降的更新公式。设 ,预测值
,则梯度为:
$$
其中 对应的
恒为
。每轮用全部样本算梯度再更新,这就是"批量"梯度下降。
第四步:预测。 对测试样本算 ,
输出
,否则输出
。
一个容易踩的坑:计算均值和中位数时,只用有效值(同时满足非NaN和在范围内),而不是分别处理。如果某列完全没有有效值,均值和中位数都用 。
时间复杂度 ,其中
是迭代次数,
是特征数,
,非常小。
代码
import sys
import math
def solve():
input_data = sys.stdin.read().split('\n')
idx = 0
N = int(input_data[idx].strip())
idx += 1
raw_train = []
for i in range(N):
parts = input_data[idx].strip().split(',')
idx += 1
raw_train.append(parts)
M = int(input_data[idx].strip())
idx += 1
raw_test = []
for i in range(M):
parts = input_data[idx].strip().split(',')
idx += 1
raw_test.append(parts)
NF = 5
def is_nan(s):
return s.strip().lower() == 'nan'
def parse_float(s):
if is_nan(s.strip()):
return None
try:
return float(s)
except:
return None
def in_bounds(f, val):
if f <= 1: # writes, reads: >= 0
return val >= 0
if f <= 3: # avg_write_ms, avg_read_ms: [0, 1000]
return 0 <= val <= 1000
return 0 <= val <= 20 # years: [0, 20]
# 收集有效值
valid_values = [[] for _ in range(NF)]
for parts in raw_train:
for f in range(NF):
val = parse_float(parts[f + 1])
if val is not None and in_bounds(f, val):
valid_values[f].append(val)
# 算均值和中位数
feat_mean = [0.0] * NF
feat_median = [0.0] * NF
for f in range(NF):
vv = valid_values[f]
if not vv:
continue
feat_mean[f] = sum(vv) / len(vv)
sv = sorted(vv)
n = len(sv)
feat_median[f] = sv[n // 2] if n % 2 == 1 else (sv[n // 2 - 1] + sv[n // 2]) / 2
# 清洗特征
def clean(parts):
feats = []
for f in range(NF):
val = parse_float(parts[f + 1])
if val is None:
feats.append(feat_mean[f])
elif not in_bounds(f, val):
feats.append(feat_median[f])
else:
feats.append(val)
return feats
# 构建训练集
X, y = [], []
for parts in raw_train:
if len(parts) < 7:
continue
try:
label = int(parts[6].strip())
except:
continue
X.append(clean(parts))
y.append(label)
X_test = [clean(p) for p in raw_test]
# 逻辑回归
w = [0.0] * (NF + 1)
lr = 0.01
n = len(X)
def sigmoid(z):
if z > 500: return 1.0
if z < -500: return 0.0
return 1.0 / (1.0 + math.exp(-z))
for _ in range(100):
grads = [0.0] * (NF + 1)
for i in range(n):
z = w[0] + sum(w[f + 1] * X[i][f] for f in range(NF))
err = sigmoid(z) - y[i]
grads[0] += err
for f in range(NF):
grads[f + 1] += err * X[i][f]
for j in range(NF + 1):
w[j] -= lr * grads[j] / n
# 预测
for feats in X_test:
z = w[0] + sum(w[f + 1] * feats[f] for f in range(NF))
print(1 if sigmoid(z) >= 0.5 else 0)
solve()
#include <bits/stdc++.h>
using namespace std;
const int NF = 5;
bool inBounds(int f, double val) {
if (f <= 1) return val >= 0;
if (f <= 3) return val >= 0 && val <= 1000;
return val >= 0 && val <= 20;
}
double sigmoid(double z) {
if (z > 500) return 1.0;
if (z < -500) return 0.0;
return 1.0 / (1.0 + exp(-z));
}
int main() {
int N;
scanf("%d", &N);
vector<vector<string>> rawTrain(N);
char buf[1024];
for (int i = 0; i < N; i++) {
scanf("%s", buf);
stringstream ss(buf);
string token;
while (getline(ss, token, ','))
rawTrain[i].push_back(token);
}
int M;
scanf("%d", &M);
vector<vector<string>> rawTest(M);
for (int i = 0; i < M; i++) {
scanf("%s", buf);
stringstream ss(buf);
string token;
while (getline(ss, token, ','))
rawTest[i].push_back(token);
}
auto isNan = [](const string& s) {
string t = s;
transform(t.begin(), t.end(), t.begin(), ::tolower);
return t == "nan";
};
auto parseDouble = [&](const string& s, bool& ok) -> double {
ok = false;
if (isNan(s)) return 0;
try { double v = stod(s); ok = true; return v; }
catch (...) { return 0; }
};
vector<vector<double>> validVals(NF);
for (auto& parts : rawTrain) {
for (int f = 0; f < NF; f++) {
bool ok;
double v = parseDouble(parts[f + 1], ok);
if (ok && inBounds(f, v))
validVals[f].push_back(v);
}
}
vector<double> fm(NF, 0), fmed(NF, 0);
for (int f = 0; f < NF; f++) {
auto& vv = validVals[f];
if (vv.empty()) continue;
double s = 0;
for (double x : vv) s += x;
fm[f] = s / vv.size();
sort(vv.begin(), vv.end());
int n = vv.size();
fmed[f] = (n % 2) ? vv[n / 2] : (vv[n / 2 - 1] + vv[n / 2]) / 2.0;
}
auto clean = [&](vector<string>& parts) {
vector<double> feats(NF);
for (int f = 0; f < NF; f++) {
bool ok;
double val = parseDouble(parts[f + 1], ok);
if (!ok) feats[f] = fm[f];
else if (!inBounds(f, val)) feats[f] = fmed[f];
else feats[f] = val;
}
return feats;
};
vector<vector<double>> X;
vector<int> y;
for (auto& parts : rawTrain) {
if ((int)parts.size() < 7) continue;
try { y.push_back(stoi(parts[6])); }
catch (...) { continue; }
X.push_back(clean(parts));
}
vector<vector<double>> Xt;
for (auto& p : rawTest) Xt.push_back(clean(p));
vector<double> w(NF + 1, 0.0);
int n = X.size();
for (int iter = 0; iter < 100; iter++) {
vector<double> g(NF + 1, 0.0);
for (int i = 0; i < n; i++) {
double z = w[0];
for (int f = 0; f < NF; f++) z += w[f + 1] * X[i][f];
double err = sigmoid(z) - y[i];
g[0] += err;
for (int f = 0; f < NF; f++) g[f + 1] += err * X[i][f];
}
for (int j = 0; j <= NF; j++)
w[j] -= 0.01 * g[j] / n;
}
for (auto& feats : Xt) {
double z = w[0];
for (int f = 0; f < NF; f++) z += w[f + 1] * feats[f];
printf("%d\n", sigmoid(z) >= 0.5 ? 1 : 0);
}
}

京公网安备 11010502036488号