FP-Growth算法自定义扩展(certainty,conviction)指标
FP-Growth 算法的指标扩展
简短回答:Spark MLlib 的 FP-Growth 算法默认不直接提供 certainty 和 conviction 指标,但我们可以轻松计算它们!
1. 标准指标 vs 扩展指标
Spark FP-Growth 提供的默认指标:
|
指标 |
说明 |
公式 |
|
|
支持度 (support) |
项集出现频率 |
P(A∪B) |
|
|
置信度 (confidence) |
规则的可靠性 |
`P(B |
A) = support(A∪B) / support(A)` |
|
提升度 (lift) |
规则的有效性 |
`P(B |
A) / P(B)` |
可以计算的扩展指标:
|
指标 |
说明 |
公式 |
|
确定性 (certainty) |
规则的可信度 |
(confidence(A→B) + confidence(B→A)) / 2 |
|
确信度 (conviction) |
规则的反向度量 |
(1 – support(B)) / (1 – confidence(A→B)) |
2. 完整代码:计算所有指标
import sys
import os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import split, col, udf, array_contains, size, lit
from pyspark.ml.fpm import FPGrowth
import pandas as pd
# 创建SparkSession
spark = SparkSession.builder
.appName("FP_Growth_Extended_Metrics")
.master("local[2]")
.getOrCreate()
# 购物篮数据
data = [
(1, 1001, "2024-01-01", "milk,bread,butter"),
(2, 1001, "2024-01-01", "bread,eggs,juice"),
(3, 1002, "2024-01-01", "milk,bread,beer,diapers"),
(4, 1003, "2024-01-02", "bread,butter,cheese"),
(5, 1004, "2024-01-02", "milk,bread,eggs,beer"),
(6, 1004, "2024-01-02", "beer,diapers"),
(7, 1005, "2024-01-03", "milk,bread,diapers,beer"),
(8, 1006, "2024-01-03", "bread,butter,milk"),
(9, 1007, "2024-01-03", "eggs,bread,milk"),
(10, 1008, "2024-01-04", "diapers,beer,milk"),
(11, 1009, "2024-01-04", "bread,milk,butter,eggs"),
(12, 1010, "2024-01-04", "beer,diapers,milk,bread"),
(13, 1011, "2024-01-05", "bread,cheese,wine"),
(14, 1012, "2024-01-05", "milk,bread,eggs,butter"),
(15, 1013, "2024-01-05", "diapers,beer,bread"),
]
# 创建DataFrame
df = spark.createDataFrame(data, ["transaction_id", "customer_id", "date", "items_str"])
df = df.withColumn("items", split(col("items_str"), ","))
print("=" * 80)
print("FP-Growth 算法扩展指标分析")
print("=" * 80)
print("
1. 运行标准 FP-Growth 算法")
print("-" * 40)
# 运行FP-Growth
fp = FPGrowth(itemsCol="items", minSupport=0.2, minConfidence=0.3)
model = fp.fit(df)
# 获取频繁项集和规则
freq_itemsets = model.freqItemsets
association_rules = model.associationRules
print(f"发现 {freq_itemsets.count()} 个频繁项集")
print(f"发现 {association_rules.count()} 条关联规则")
print("
2. 计算扩展指标")
print("-" * 40)
# 创建支持度字典以便快速查找
support_dict = {}
for row in freq_itemsets.collect():
# 将列表转换为元组以便哈希
itemset_key = tuple(sorted(row['items']))
support_dict[itemset_key] = row['freq'] / df.count()
# 计算 certainty 和 conviction 的自定义函数
def calculate_extended_metrics(antecedent, consequent, confidence, lift):
"""
计算扩展指标
"""
# 查找支持度
antecedent_key = tuple(sorted(antecedent))
consequent_key = tuple(sorted(consequent))
union_key = tuple(sorted(set(antecedent).union(consequent)))
support_antecedent = support_dict.get(antecedent_key, 0)
support_consequent = support_dict.get(consequent_key, 0)
support_union = support_dict.get(union_key, 0)
# 计算反向置信度
if support_consequent > 0:
confidence_reverse = support_union / support_consequent
else:
confidence_reverse = 0
# 1. Certainty (确定性)
certainty = (confidence + confidence_reverse) / 2 if (confidence + confidence_reverse) > 0 else 0
# 2. Conviction (确信度)
if confidence == 1:
conviction = float('inf') # 无穷大
else:
conviction = (1 - support_consequent) / (1 - confidence) if confidence < 1 else float('inf')
# 3. Leverage (杠杆率)
leverage = support_union - (support_antecedent * support_consequent)
# 4. Coverage (覆盖度)
coverage = support_antecedent
return {
'certainty': round(certainty, 4),
'conviction': round(conviction, 4) if conviction != float('inf') else 'INF',
'leverage': round(leverage, 4),
'coverage': round(coverage, 4),
'confidence_reverse': round(confidence_reverse, 4)
}
# 应用扩展指标计算
from pyspark.sql import Row
from pyspark.sql.types import FloatType
# 注册UDF来计算扩展指标
def calculate_metrics_udf(antecedent, consequent, confidence, lift):
metrics = calculate_extended_metrics(list(antecedent), list(consequent), confidence, lift)
return [metrics['certainty'],
float(metrics['conviction']) if metrics['conviction'] != 'INF' else float('inf'),
metrics['leverage'],
metrics['coverage'],
metrics['confidence_reverse']]
# 定义UDF返回类型
from pyspark.sql.types import ArrayType
calculate_metrics_udf_spark = udf(calculate_metrics_udf, ArrayType(FloatType()))
# 添加扩展指标到规则
rules_with_metrics = association_rules.withColumn(
"extended_metrics",
calculate_metrics_udf_spark("antecedent", "consequent", "confidence", "lift")
)
# 拆分扩展指标到单独的列
rules_extended = rules_with_metrics.select(
"antecedent",
"consequent",
"confidence",
"lift",
(col("extended_metrics")[0]).alias("certainty"),
(col("extended_metrics")[1]).alias("conviction"),
(col("extended_metrics")[2]).alias("leverage"),
(col("extended_metrics")[3]).alias("coverage"),
(col("extended_metrics")[4]).alias("confidence_reverse")
)
print("
3. 完整指标分析")
print("-" * 40)
# 显示所有指标
print("关联规则完整指标表:")
rules_extended.orderBy("confidence", ascending=False).show(truncate=False)
print("
4. 指标解释和业务意义")
print("-" * 40)
# 收集规则进行分析
rules_list = rules_extended.collect()
if len(rules_list) > 0:
print("
指标含义说明:")
print("1. 支持度 (Support): 规则中的商品组合出现的频率")
print("2. 置信度 (Confidence): 购买A的情况下购买B的概率")
print("3. 提升度 (Lift): 规则的有效性,>1表明正相关")
print("4. 确定性 (Certainty): 双向可信度的平均值")
print("5. 确信度 (Conviction): 规则的反向度量,越高越好")
print("6. 杠杆率 (Leverage): 实际联合出现与期望的差异")
print("7. 覆盖度 (Coverage): 前件商品的流行度")
print("
高质量规则示例:")
print("-" * 30)
for i, rule in enumerate(rules_list[:5], 1):
antecedent = rule['antecedent']
consequent = rule['consequent']
confidence = rule['confidence']
lift = rule['lift']
certainty = rule['certainty']
conviction = rule['conviction']
leverage = rule['leverage']
print(f"
规则 {i}: {antecedent} → {consequent}")
print(f" 置信度: {confidence:.1%}")
print(f" 提升度: {lift:.2f}")
print(f" 确定性: {certainty:.2f}")
print(f" 确信度: {conviction:.2f}")
# 业务解释
if lift > 1 and confidence > 0.5:
print(f" 强关联规则:购买{antecedent}会显著增加购买{consequent}的可能性")
if certainty > 0.6:
print(f" ✅ 高确定性:这个关联是双向的")
if conviction > 1.5:
print(f" 高确信度:规则很少出错")
# 5. 按不同指标排序分析
print("
5. 按不同指标的最佳规则")
print("-" * 40)
# 按置信度排序
print("按置信度排序(最高):")
rules_extended.orderBy("confidence", ascending=False).limit(3).show(truncate=False)
# 按提升度排序
print("
按提升度排序(最高):")
rules_extended.orderBy("lift", ascending=False).limit(3).show(truncate=False)
# 按确定性排序
print("
按确定性排序(最高):")
rules_extended.orderBy("certainty", ascending=False).limit(3).show(truncate=False)
# 按确信度排序(排除无穷大)
print("
按确信度排序(最高,有限值):")
rules_extended.filter(col("conviction") < float('inf'))
.orderBy("conviction", ascending=False)
.limit(3).show(truncate=False)
# 6. 综合评分系统
print("
6. 综合评分系统")
print("-" * 40)
# 计算综合得分
def calculate_composite_score(confidence, lift, certainty, conviction):
"""计算综合得分"""
# 标准化权重
score = (confidence * 0.4 + # 置信度权重40%
min(lift, 5) / 5 * 0.3 + # 提升度权重30%(限制最大5)
certainty * 0.2 + # 确定性权重20%
min(conviction, 10) / 10 * 0.1) # 确信度权重10%(限制最大10)
return score
# 注册UDF
from pyspark.sql.types import DoubleType
composite_score_udf = udf(calculate_composite_score, DoubleType())
rules_scored = rules_extended.withColumn(
"composite_score",
composite_score_udf("confidence", "lift", "certainty", "conviction")
)
print("综合评分最高的规则:")
rules_scored.orderBy("composite_score", ascending=False).limit(5).show(truncate=False)
# 7. 高级分析:规则分类
print("
7. 规则分类分析")
print("-" * 40)
# 根据指标分类规则
def classify_rule(confidence, lift, certainty):
"""分类规则"""
if confidence > 0.7 and lift > 2:
return "强推荐规则"
elif confidence > 0.5 and lift > 1.5:
return "推荐规则"
elif confidence > 0.3 and lift > 1:
return "一般规则"
else:
return "弱规则"
classify_udf = udf(classify_rule, StringType())
rules_classified = rules_scored.withColumn(
"rule_category",
classify_udf("confidence", "lift", "certainty")
)
# 统计各类规则数量
category_stats = rules_classified.groupBy("rule_category").count().orderBy("count", ascending=False)
print("规则分类统计:")
category_stats.show()
# 8. 可视化分析
print("
8. 指标相关性分析")
print("-" * 40)
try:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
# 转换为Pandas DataFrame进行分析
rules_pd = rules_extended.toPandas()
# 替换无穷大为大数值
rules_pd['conviction'] = rules_pd['conviction'].replace([float('inf')], 100)
rules_pd['conviction'] = rules_pd['conviction'].clip(upper=10) # 限制最大值
# 创建相关性矩阵
metrics_columns = ['confidence', 'lift', 'certainty', 'conviction', 'leverage', 'coverage']
correlation_matrix = rules_pd[metrics_columns].corr()
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
# 1. 置信度分布
axes[0, 0].hist(rules_pd['confidence'], bins=10, edgecolor='black', alpha=0.7)
axes[0, 0].set_xlabel('置信度')
axes[0, 0].set_ylabel('规则数量')
axes[0, 0].set_title('置信度分布')
# 2. 提升度分布
axes[0, 1].hist(rules_pd['lift'], bins=10, edgecolor='black', alpha=0.7, color='green')
axes[0, 1].set_xlabel('提升度')
axes[0, 1].set_ylabel('规则数量')
axes[0, 1].set_title('提升度分布')
# 3. 确定性分布
axes[0, 2].hist(rules_pd['certainty'], bins=10, edgecolor='black', alpha=0.7, color='orange')
axes[0, 2].set_xlabel('确定性')
axes[0, 2].set_ylabel('规则数量')
axes[0, 2].set_title('确定性分布')
# 4. 确信度分布
axes[1, 0].hist(rules_pd['conviction'], bins=10, edgecolor='black', alpha=0.7, color='red')
axes[1, 0].set_xlabel('确信度')
axes[1, 0].set_ylabel('规则数量')
axes[1, 0].set_title('确信度分布')
# 5. 置信度 vs 提升度散点图
axes[1, 1].scatter(rules_pd['confidence'], rules_pd['lift'], alpha=0.6)
axes[1, 1].set_xlabel('置信度')
axes[1, 1].set_ylabel('提升度')
axes[1, 1].set_title('置信度 vs 提升度')
# 6. 相关性热图
im = axes[1, 2].imshow(correlation_matrix.values, cmap='coolwarm', vmin=-1, vmax=1)
axes[1, 2].set_xticks(range(len(metrics_columns)))
axes[1, 2].set_yticks(range(len(metrics_columns)))
axes[1, 2].set_xticklabels(metrics_columns, rotation=45)
axes[1, 2].set_yticklabels(metrics_columns)
axes[1, 2].set_title('指标相关性热图')
# 添加颜色条
plt.colorbar(im, ax=axes[1, 2])
plt.tight_layout()
plt.savefig('fp_growth_extended_metrics.png', dpi=150, bbox_inches='tight')
print("图表已保存为 'fp_growth_extended_metrics.png'")
# 显示相关性
print("
指标相关性矩阵:")
print(correlation_matrix.round(2))
except ImportError:
print("需要安装matplotlib和seaborn进行可视化")
print("安装命令: pip install matplotlib seaborn")
# 9. 保存完整结果
print("
9. 保存分析结果")
print("-" * 40)
# 保存带有所有指标的规则
rules_scored.coalesce(1).write.csv(
"fp_growth_complete_rules.csv",
header=True,
mode="overwrite"
)
print("✓ 完整规则已保存到 fp_growth_complete_rules.csv")
# 生成报告摘要
print("
" + "=" * 80)
print("分析报告摘要")
print("=" * 80)
total_rules = rules_extended.count()
high_confidence_rules = rules_extended.filter(col("confidence") > 0.7).count()
high_lift_rules = rules_extended.filter(col("lift") > 2).count()
high_certainty_rules = rules_extended.filter(col("certainty") > 0.6).count()
print(f"总规则数: {total_rules}")
print(f"高置信度规则 (>70%): {high_confidence_rules} ({high_confidence_rules/total_rules:.1%})")
print(f"高提升度规则 (>2): {high_lift_rules} ({high_lift_rules/total_rules:.1%})")
print(f"高确定性规则 (>0.6): {high_certainty_rules} ({high_certainty_rules/total_rules:.1%})")
# 业务提议
print("
业务提议:")
print("1. 关注置信度和提升度都高的规则进行营销")
print("2. 确定性高的规则适合双向推荐")
print("3. 确信度高的规则更可靠,适合作为核心策略")
# 关闭Spark
spark.stop()
print("
" + "=" * 80)
print("分析完成!")
print("=" * 80)
简化版本:仅计算 certainty 和 conviction
import sys
import os
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import split, col, udf
from pyspark.sql.types import DoubleType
spark = SparkSession.builder.master("local").getOrCreate()
# 简单数据
data = [(1, "milk,bread"), (2, "bread,beer"), (3, "milk,bread,beer"),
(4, "bread,butter"), (5, "milk,bread,butter")]
df = spark.createDataFrame(data, ["id", "items"])
df = df.withColumn("items", split("items", ","))
# 运行FP-Growth
fp = FPGrowth(itemsCol="items", minSupport=0.3, minConfidence=0.5)
model = fp.fit(df)
# 获取支持度信息
total_transactions = df.count()
support_dict = {}
for row in model.freqItemsets.collect():
itemset_key = tuple(sorted(row['items']))
support_dict[itemset_key] = row['freq'] / total_transactions
# 计算certainty和conviction的函数
def calc_certainty_conviction(antecedent, consequent, confidence):
antecedent_key = tuple(sorted(antecedent))
consequent_key = tuple(sorted(consequent))
union_key = tuple(sorted(set(antecedent).union(consequent)))
support_antecedent = support_dict.get(antecedent_key, 0)
support_consequent = support_dict.get(consequent_key, 0)
support_union = support_dict.get(union_key, 0)
# Certainty (确定性)
if support_consequent > 0:
confidence_reverse = support_union / support_consequent
else:
confidence_reverse = 0
certainty = (confidence + confidence_reverse) / 2
# Conviction (确信度)
if confidence == 1:
conviction = float('inf')
else:
conviction = (1 - support_consequent) / (1 - confidence) if confidence < 1 else float('inf')
return (float(certainty), float(conviction) if conviction != float('inf') else float('inf'))
# 注册UDF
calc_metrics_udf = udf(calc_certainty_conviction,
returnType=ArrayType(DoubleType()))
# 添加指标
rules_with_extra = model.associationRules.withColumn(
"extra_metrics",
calc_metrics_udf("antecedent", "consequent", "confidence")
).select(
"antecedent",
"consequent",
"confidence",
"lift",
col("extra_metrics")[0].alias("certainty"),
col("extra_metrics")[1].alias("conviction")
)
print("带有certainty和conviction指标的规则:")
rules_with_extra.show(truncate=False)
spark.stop()
各指标详细说明
1.Certainty (确定性)
certainty = (confidence(A→B) + confidence(B→A)) / 2
- 取值范围: 0-1
- 意义: 衡量规则的双向可信度
- 判断标准:
- 0.7: 强确定性
- 0.5-0.7: 中等确定性
- <0.5: 弱确定性
2.Conviction (确信度)
conviction = (1 - support(B)) / (1 - confidence(A→B))
- 取值范围: 0-∞
- 意义: 衡量规则预测错误的频率
- 判断标准:
- ∞: 完美规则(从不错误)
- 1.5: 高质量规则
- 1.0: 与随机无差异
- <1.0: 规则比随机猜测还差
3.其他可用指标
# Leverage (杠杆率)
leverage = support(A∪B) - support(A) × support(B)
# Coverage (覆盖度)
coverage = support(A)
# Added Value (附加值)
added_value = confidence(A→B) - support(B)
总结
虽然 Spark 的 FP-Growth 默认不提供 certainty 和 conviction 指标,但我们可以基于已有的支持度、置信度信息轻松计算出来。这些扩展指标能提供更全面的规则评估,协助你做出更好的业务决策!
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...




