import osimport emailimport email.policy
1. 读取邮件数据
SPAM_PATH = os.path.join( "E:\\3.Study\\机器学习\\Hand on Machine Learning\\第三章:分类\\spam_data")spam_path = os.path.join(SPAM_PATH, "spam")ham_path = os.path.join(SPAM_PATH, "easy_ham")spam_list = [name for name in os.listdir(spam_path) if len(name) > 20]ham_list = [name for name in os.listdir(ham_path) if len(name) > 20]def load_email(is_spam, filename, spam_path=SPAM_PATH): directory = "spam" if is_spam else "easy_ham" with open(os.path.join(spam_path, directory, filename), "rb") as f: return email.parser.BytesParser(policy=email.policy.default).parse(f) # email.message.EmailMessage'类型,没法用list.append接收 # return email.parser.BytesParser(policy=email.policy.default).parse(f)# 这里有类型问题,应该记住这种加载email文件的形式。尝试list.append添加数据,加入的数据是generator类型,非email类型ham_emails = [load_email(is_spam=False, filename=name) for name in ham_list]spam_emails = [load_email(is_spam=True, filename=name) for name in spam_list]print(spam_emails[0].get_content().strip())
2.分析邮件结构
def get_email_structure(email): # isinstance 函数:判断一个对象是否是已知类型。第一个参数为对象,第二个参数为类型名或者是类型名的列表。返回True/False if isinstance(email, str): return email # get_pyload()函数:返回当前邮件的正文。 # 如果正文含有多个部分的话(is_multipart=True),返回一个message对象的list列表; # 如果is_multipart=False,即正文没有多部份的话,返回一个string类型。 payload = email.get_payload() if isinstance(payload, list): return "multipart({})".format(", ".join([ get_email_structure(sub_email) for sub_email in payload ])) else: return email.get_content_type()from collections import Counterdef structures_counter(emails): # Counter类的目的是用来跟踪值出现的次数 structures = Counter() for email in emails: structure = get_email_structure(email) structures[structure] += 1 return structuresarray = structures_counter(ham_emails).most_common()array2 = structures_counter(spam_emails).most_common()print(array)print(array2)
3.分析邮件头部
for head, value in spam_emails[0].items(): print(head, ":", value)print(spam_emails[0]["Subject"])
4. 划分训练集,测试集
import numpy as npX = np.array(ham_emails+spam_emails) # ham_emails和span_emails是list类型Y = np.array([0]*len(ham_emails)+[1]*len(spam_emails))from sklearn.model_selection import train_test_splitx_train, x_test, y_train, y_test = train_test_split( X, Y, test_size=0.33, random_state=42)
5. 邮件文本预处理(转换HTML)
import re # regular expressions(正则)from html import unescapedef html_to_plain_text(html): # sub->substitute(替换) # 参数1:pattern 正则; # 参数2:repl:replacement,被替换的字符串/函数 # 参数3: string:需要被处理的内容 # 参数4: count: 匹配的数目 如果正则表达式在string中有多个匹配结果,count控制匹配的数目 # 参数5: flag : 匹配模式 # re.I 匹配对大小写不敏感 # re.M 多行匹配(以行为单位匹配) # re.S 使 . 匹配包括换行在内的所有字符 # ->用pattern模式将string里面count个的字符换成repl text = re.sub(' .*?', '', html, flags=re.M | re.S | re.I) text = re.sub(' ', 'HYPERLINK', text, flags=re.M | re.S | re.I) text = re.sub('<.*?>', '', text, flags=re.M | re.S) text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S) return unescape(text)html_spam_emails = [email for email in x_train[y_train == 1] if get_email_structure(email) == "text/html"]sample_html_spam = html_spam_emails[2]# 输出html辣鸡邮件的前1000个字符,strip()->去除首尾空格print(sample_html_spam.get_content().strip()[:1000], "...")print("-"*30)print(html_to_plain_text(sample_html_spam.get_content())[:1000], "...")
6.转换所有邮件为文本
def email_to_text(email): html = None # email->part->part.part 以树的结构存储,walk()用来循环遍历各个树及其子树 for part in email.walk(): ctype = part.get_content_type() if not ctype in ("text/plain", "text/html"): continue # 跳过不是以上两种类型的部分 try: content = part.get_content() except: content = str(part.get_payload()) if ctype == "text/plain": return content else: html = content if html: return html_to_plain_text(html)print(email_to_text(sample_html_spam)[:100], "...")
7. 自然语言处理
try: import nltk stemmer = nltk.PorterStemmer() # 建立一个波特词干算法(分析单词的词干) for word in ("Conputations", "Computation", "Computing", "Computed", "Compulsive"): print(word, "=>", stemmer.stem(word))except ImportError: print("Error: stemming requires the NLTK module.") stemmer = None
8.URL识别
try: import urlextract url_extracror = urlextract.URLExtract() print(url_extracror.find_urls("will it detect gitub.com and https://www.google.com/search?ei=nqXjXL2VM5vqwQPks4rQAw&q=python+nltk&oq=python+nltk&gs_l=psy-ab.3..0j0i203l2j0j0i203l6.1867661.1868738..1869035...0.0..0.311.750.0j3j0j1......0....1..gws-wiz.......0i71j0i67.eLLHBxPtulQ"))except ImportError: print("Error:url_extracror requires the urlextract module.") urlextract = None
9. 对邮件内的所有单词进行计数
from sklearn.base import BaseEstimator, TransformerMixinclass EmailToWordCounterTransformer(BaseEstimator, TransformerMixin): def __init__(self, strip_headers=True, low_case=True, remove_punctuation=True, repalce_urls=True, replace_numbers=True, stemming=True): self.strip_headers = strip_headers self.low_case = low_case self.remove_punctuation = remove_punctuation self.replace_urls = repalce_urls self.replace_numbers = replace_numbers self.stemming = stemming def fit(self, X, Y=None): return self def transform(self, X, Y=None): X_transform = [] for email in X: text = email_to_text(email) or "" if self.low_case: text = text.lower() if self.replace_urls and url_extracror is not None: # list(set()) 创建一个不重复的元素集 urls = list(set(url_extracror.find_urls(text))) urls.sort(key=lambda url: len(url), reverse=True) # 根据url的长度对url进行排序 for url in urls: text = text.replace(url, "URL") # 用“URL”换所有真实的url if self.replace_numbers: # 将所有数字转换为NUMBER字符 text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+))?', 'NUMBER', text) if self.remove_punctuation: # 删除所有标点符号 text = re.sub(r'\W+', ' ', text, flags=re.M) # \W 匹配任何非单词字符 # Counter()返回一个特殊的字典,包含单词种类和单词数量。eg:{"a":3,"b""2} word_count = Counter(text.split()) if self.stemming and stemmer is not None: stemmed_word_counts = Counter() for word, count in word_count.items(): # 分析单词的词干,统计词干的数量 stemmed_word = stemmer.stem(word) stemmed_word_counts[stemmed_word] += count word_count = stemmed_word_counts X_transform.append(word_count) # 将每个邮件的字符字典存到list中 return np.array(X_transform)X_few = x_train[:3]X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)print(X_few_wordcounts)from scipy.sparse import csr_matrix # 压缩稀疏行矩阵class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin): def __init__(self, vocabulary_size=100): self.vocabulary_size = vocabulary_size def fit(self, X, Y=None): total_count = Counter() for word_count in X: for word, count in word_count.items(): # X是上个函数内的字典,不是X数据集 total_count[word] += min(count, 10) # 次数超过10的存10 # most_common 字典里面出现次数最多的.当most_common没有参数时,返回字典所有的item,从大到小排列 # 查看前vocabulaty_size个出现次数最多的 most_common = total_count.most_common()[:self.vocabulary_size] self.most_common_ = most_common # most_commoon [('number', 15), ('i', 7), ('the', 7), ('url', 7), ('to', 4), ('chri', 3), ('wa', 3), ('from', 3), ('list', 3), ('of', 3)] # 将most_common里面的出现频率最多的词从多到少依次排序,返回{(单词,序号)} self.vocabulary_ = {word: index + 1 for index, (word, count) in enumerate(most_common)} # vocabulary {'number': 1, 'i': 2, 'the': 3, 'url': 4, 'to': 5, 'chri': 6, 'wa': 7, 'from': 8, 'list': 9, 'of': 10} return self def transform(self, X, Y=None): rows = [] cols = [] data = [] for row, word_count in enumerate(X): for word, count in word_count.items(): rows.append(row) cols.append(self.vocabulary_.get(word, 0)) data.append(count) return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size+1))vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)print(X_few_wordcounts)print(X_few_vectors.toarray())print(vocab_transformer.vocabulary_)
10.训练分类器
from sklearn.pipeline import Pipeline # 创建流水线处理preprocess_pipeline = Pipeline([ ("email_to_wordcount", EmailToWordCounterTransformer()), ("wordcount_to_vector", WordCounterToVectorTransformer()),])X_train_transformed = preprocess_pipeline.fit_transform(x_train)from sklearn.linear_model import LogisticRegressionfrom sklearn.model_selection import cross_val_scorelog_clf = LogisticRegression()score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3, n_jobs=-1)print(score.mean())
11.评估分类器
from sklearn.metrics import precision_score, recall_scoreX_test_transformed = preprocess_pipeline.transform(x_test)# solver 优化算法的参数,包括newton-cg,lbfgs,liblinear,sag,saga,对损失的优化的方法log_clf2 = LogisticRegression(solver="liblinear", random_state=42)log_clf2.fit(X_train_transformed, y_train)y_pred = log_clf2.predict(X_test_transformed)print(precision_score(y_test, y_pred))print(recall_score(y_test, y_pred))from sklearn.naive_bayes import MultinomialNBmnb = MultinomialNB()mnb.fit(X_train_transformed, y_train)mnb_y_pred = mnb.predict(X_test_transformed)print(precision_score(y_test, mnb_y_pred))print(recall_score(y_test, mnb_y_pred))