在人大DMC学习的时候获得了一批某公司音乐下载log数据,有7天的日访问log文件,共7个文件,每个文件大概1.2G,还有一个是mp3_rid_info.txt,是音乐id对应的歌曲信息。
数据格式如下:
数据示例如下:
_qQaHMAb9aH{]{]{]20110701 000000{]v{]48544534qQaHxxNa84{]{]HQaHxvNwoA{]20110701 000000{]v{]4899081VQS55L0lfa2HqQajg3zOQEO3z3GQ33zQ3{]{]VQS5WL2v_wDNfojjj3z3O2O3z3GQ33z2O{]20110701 000000{]v{]48544534qQaHtCw7gK{]{]4qQaHtCw7gK{]20110701 000000{]v{]109720HqQaHYE14lp{]{]7aQaHojc6ju{]20110701 000000{]d{]12947091VQS5WLwGxWg4qQajV3zOQEO3zG2QGGz2O{]{]VQS5WLwGxWg4qQajV3zOQEO3zG2QGGz2O{]20110701 000000{]v{]95073Y1VqQajs80L5Jj-{]{]VQS5WLKQwrW5COajI3zVGVO3zOOZE3z2O{]20110701 000000{]v{]2652954KM4qQajwobL5YDq{]{]XLueQaj7obL5bjt{]20110701 000000{]d{]19171739VQS5WLWu4VE2qQajK3zOQEO3z_3QQ2z2O{]{]VQS5WLoqjIHFKQaj93zOQEO3z_3QQ2z2O{]20110701 000000{]s{]刘承俊4qQaHiEnsi5{]{]zqQaHFFxBwL{]20110701 000000{]d{]5922248VQS5WL-yEGT4qQajT3zOQEO3zVO3ez2O{]{]gdAB1c3a4x4sheoOam{]20110701 000000{]d{]117764653qHqQajc80L5bsV{]{]3qHqQajc80L5bsV{]20110701 000000{]v{]4904109VQS5WLuuo-04qQajQ3zOQEO3zO_GOGz2O{]{]VQS5WLz8N7CfqQaj13zOQEO3zO_GOGz2O{]20110701 000000{]s{]黄家驹yxHqQajB80L5oJB{]{]1lDNFjjm80L5w7y{]20110701 000000{]v{]1899686IqQaHJi6YbK{]{]IqQaH3wxlaU{]20110701 000000{]s{]厉志歌曲v-n3EqQa4eerAqmQaB{]{]{]20110701 000000{]v{]1023U9fqQajfoxyi2mJ{]{]dZbr3aj2oxyixcc{]20110701 000000{]v{]34593647G2SjP05164qQaarG3_OG3O3oWLW7b{]{]X6GaHRq8VrA{]20110701 000000{]v{]1937019p84qQajpobL54O0{]{]DFIqQaj5obL5lmQ{]20110701 000000{]d{]18900639H64qQajgobL5BZv{]{]pHc1Qaj7obL5xfz{]20110701 000000{]d{]64458414qQaHVaIRa5{]{]4qQaHVaIRa5{]20110701 000000{]d{]12067314VQS05L4JUDr4qQajP3zOQEO3zZ33Ez_O{]{]VQS05L4JUDr4qQajP3zOQEO3zZ33Ez_O{]20110701 000000{]d{]36028283VQS55LNqP-GnqQaj83zOQEO3zVO3ezQ3{]{]ThlE3ajl80L5NKt{]20110701 000000{]s{]刀郎hqQaHwOcF9M{]{]_qQaHnEIQ2B{]20110701 000000{]d{]303899044qQaHilUFEB{]{]U4JjHXDdSCQ{]20110701 000000{]d{]36227787rZnqQajFoxyi2ZA{]{]n3tcQajtoxyiKfX{]20110701 000000{]v{]41267794qQaHKpbaNa{]{]4qQaHKpbaNa{]20110701 000000{]s{]我这个你不爱的人+迪克牛VQS5WLZDENS4qQajU3zOQEO3zOG2E3z2O{]{]VQS5WLNCYrbHqQajb3zOQEO3zOG2E3z2O{]20110701 000000{]s{]最幸福的人fqQaHEv1cpS{]{]BpQaHjADVEj{]20110701 000000{]s{]黄小琥4qQaHDUEMXY{]{]CBvAJKQa4ecVpsQa0{]20110701 000000{]s{]少女时代jJucbdfqQaad80L5aCn_-u{]{]jJucbdfqQaad80L5aCn_u{]20110701 000000{]v{]1023sELFnqQa4480JOvQax{]{]sELFnqQa4480JOvQax{]20110701 000000{]v{]1993325
这些数据如何用来做分析呢,我考虑了一下,可以做推荐、用户活跃度变化的分析、歌曲或者用户的聚类。不过,刚拿到数据的时候,我也没想到这么多,正好当时在学习频繁项集,就拿这个来练习吧。由于我比较习惯用python作数据分析,就选择python了。
条件:我手头的机器不是很给力,ubuntu的虚拟机,32bit,从CPU为E6600虚拟的主机出来一个核,512MB内存。但是我还是想试试看,7天的数据难处理,就先处理一天的数据。
预处理 将同样session的所有项集放在一起,作为一个“购物车”。 编程目标:从大量的log信息中将同一session的下载歌曲的id归类。# coding=UTF-8import reimport sysimport fileinputimport inspectfrom pymongo import Connectionimport bsonreload(sys)sys.setdefaultencoding("utf-8") linereg=re.compile(r"([^ ]+)\{\](\d*)\{\]([^ ]*)\{\](\d{8} \d{6})\{\]([dsv])\{\]([^ ]+)") class recordItem:#记录类,包含各字段 def __init__(self,*groups): self.sessionid,self.phone,self.uid,self.time,self.typ,self.value=groups try: self.value=self.value.decode("utf-8") except UnicodeDecodeError: try: self.value=self.value.decode("gbk") except UnicodeDecodeError: self.value=self.value class visitLogFile():#该类为一个生成器,每个元素即为每个记录 def __init__(self,filename): self.fd=fileinput.input(filename) def close(self): self.fd.close() def __iter__(self): for line in self.fd: if line: line=line.rstrip("\n") line=line.strip() m=re.match(linereg,line) if not m: try: line=line.decode("utf-8") except UnicodeDecodeError: try: line=line.decode("gbk") except UnicodeDecodeError: print "shit!",fileinput.lineno() print line,fileinput.lineno() else: try: record=recordItem(*m.groups()) yield record except GeneratorExit: pass except Exception as e: print "GENERATOR ERROR:",line,fileinput.fileno() def prop(obj): pr={} for name in dir(obj): value=getattr(obj,name) if not name.startswith("__") and not inspect.ismethod(value): pr[name]=value return pr if __name__ == "__main__": conn=Connection() db=conn.easou collection=db.visit vlf=visitLogFile("visit.txt.20110701.2")#以文件名作为参数 for item in vlf:#遍历生成器,并将每条记录写进mogodb try: collection.insert(prop(item)) except bson.errors.InvalidStringData: print "Encode Error",item vlf.close()
失败原因:数据库大于2G,而我的系统是32bit的,32bit的系统最多只能在mongodb里面存放2G的数据库。
2.shell管道流方案
这里可以借鉴mapreduce的工作原理,先将同样session id的记录归类,然后将它们收集起来,形成一个一个“购物车”的形式。
import sysimport rereload(sys)sys.setdefaultencoding("utf-8") linereg=re.compile(r"([^ ]+)\{\](\d*)\{\]([^ ]*)\{\](\d{8} \d{6})\{\]([dsv])\{\]([^ ]+)")#匹配字符串 def read_input(file): for line in file: line=line.strip() if not line=="": m=re.match(linereg,line) if m: match=m.groups() if match[4]=="d": try: value=match[5].decode("utf-8") except UnicodeDecodeError: try: value=match[5].decode("gbk") except UnicodeDecodeError: value=match[5] yield match[0]+"\t"+value#输出session id与歌曲id input=read_input(sys.stdin) for item in input: print item
用法:cat visit.txt.2011xxxx.2 | python mapvisit.py | sort > sorted.xxxx.txt
(2) reducer,生成项集
将刚才获取的已经排好序的记录进行归类就方便多了,只要用sys.stdin逐行扫描,若session与前一行相同,则加入容器,否则输出容器里面所有的id(用逗号分开),并清空容器
import sys def read_input(file): for line in file: line=line.rstrip() yield line input=read_input(sys.stdin)prev=""#存放前一个记录的session idcollection=[]#用于临时存放统一购物车的项的容器for item in input: groups=item.split("\t") session=groups[0] value=groups[1] if not session==prev:#如果与前一个记录的session id不一样,那么输出并把容器清空 if not len(collection)==0: coll=set(collection) coll=",".join([x for x in coll]) print coll collection=[] collection.append(value)#将当前记录放入容器 prev=sessionif not len(collection)==0:#最后的处理 coll=set(collection) coll=",".join([x for x in coll]) print coll
用法:cat sorted.xxxx.txt | python genCollection.py > ck.xxxx.txt
这样输出的文件就是一个个“购物车”了,示例如下,每一行代表一个“购物车”,由歌曲的id构成,用“,”分隔:
2582147123888779,2388878019323097130052422083708126011932303899101768218913014949,25704721,119571388865282120724265180610657088830389910,877099025724699856127115451360,163868681761828636186443224697621151347136151688123003871204100036168455631848113018096,33361116,20135287,3038991236314621,8254907,7741279,301796,36481093,257754003647853336484454,36488370,36484452973745636492246362830453643545822033394362633223648628720868410
生成C1及其频数
接下来就可以对购物车进行Apriori分词了。其实这个过程自动化生成Ck,并扫描就可以了,不过为了观察从小到大的各元祖的频繁度,还是一步一步来吧。如果支持度设置过高,可能都无法生成频繁的二元组,如果设置过低,可能需要机器跑好长时间才能出结果。
方案一: 扫描一遍整个“购物车”数据集,提取出C1。 再次扫描一遍数据集,扫描每个“购物车”时,将C1中的元素逐个判断,是否是该“购物车”的子集,如果是,则将相应的C1对应的出现次数加1 缺点:C1较多,耗时较长import sysfrom operator import itemgetter def read_input(file): for line in file: line=line.rstrip() yield line C1={}#用于存放各一元组及其频数input=read_input(sys.stdin)for line in input: transaction=line.strip().split(",") if not len(transaction)==0: for item in transaction: if not C1.has_key(item): C1[item]=1 else: C1[item]+=1 sCnt=sorted(C1.iteritems(), key=itemgetter(1), reverse=True)#按照字典的值进行排序for item in sCnt: print item[0]+"\t"+str(item[1])
用法:cat ck.xxxx.txt | python genC1num.py > C1num.py
用Apriori算法生成Ck,选出频繁项
import sysfrom operator import itemgetter def genCandidate(F):#通过满足支持度的Ck-1项集生成候选的Ck项集 C=[] k=len(F[0])+1 print "k="+str(k) length=len(F) for i in range(length): for j in range(i+1,length): L1=list(F[i])[:k-2] L2=list(F[j])[:k-2] L1.sort() L2.sort() if L1==L2: C.append(F[i]|F[j]) return C def scanD(D,Ck):#扫描每一“购物车”,统计每一候选项集出现的频率 ssCnt={} i=0 for tid in D: i+=1 for can in Ck: if can.issubset(tid): if not ssCnt.has_key(can): ssCnt[can]=1 else: ssCnt[can]+=1 if i%1000==0:#用于观察进度 print str(i)+" lines scaned!" sCnt=sorted(ssCnt.iteritems(), key=itemgetter(1), reverse=True) return sCnt,ssCnt def read_input(file): for line in file: line=line.rstrip() yield line.split(",") fd=open("C2num.txt","r")#操作Ck-1项集的文件,可以按照需要修改文件名ck1=[]#存放Ck-1项集while True: line=fd.readline() if not line: break item=line.split("\t") if int(item[1])<487: break ck1.append(item[0].split(",")) ck1=map(frozenset,ck1)ck=genCandidate(ck1)fd.close()print "Length of Ck is "+str(len(ck))print "Load Ck completely!" input=read_input(sys.stdin)sCnt,ssCnt=scanD(input,ck) fdout=open("C3num.txt","w")#生成Ck项集的文件,可以按照需要修改文件名for item in sCnt: ss="" for i in item[0]: ss+=i+"," ss=ss.rstrip(",") ss+="\t"+str(item[1])+"\n" fdout.write(ss)fdout.close()
用法:cat ck.xxxx.txt| python apriori.py > C3num.txt
关联规则抽取
def loadCk(filename,supportData):#加载Ck的函数 Ck=[] fd=open(filename,"r") while True: line=fd.readline() if not line:break line=line.rstrip() item=line.split("\t") if int(item[1])<487:break Ck.append(item[0].split(",")) supportData[frozenset(item[0].split(","))]=int(item[1]) return map(frozenset,Ck) def generateRules(L,supportData):#抽取关联规则的函数 bigRuleList=[] for i in range(1,len(L)): for freqset in L[i]: H1=[frozenset([item]) for item in freqset] calcConf(freqset,H1,supportData,bigRuleList) def calcConf(freqset,H,supportData,bigRuleList): for conseq in H: conf=float(supportData[freqset])/supportData[freqset-conseq] bigRuleList.append((freqset-conseq,conseq,conf)) if conf>0.1:#可信度的阈值为0.1,可以按照需求改变 print ",".join(freqset-conseq)+"\t"+",".join(conseq)+"\t"+str(conf) #print freqset-conseq+"\t"+conseq+"\t"+conf retlist=[]supportData={}retlist.append(loadCk("C1num.txt",supportData))#一元组的加载retlist.append(loadCk("C2num.txt",supportData))#二元组的加载 generateRules(retlist,supportData)
用法:python relationExtraction.py > relation.txt
抽取的关联规则如下(左边->右边 信任度):
36435459 36455065 0.10008169934636259037 26032040 0.10042083877536435458 36455064 0.10211088504636314621 36163849 0.10286382232636314622 36488369 0.10325123152736455066 36435460 0.10419397116636314621 36488368 0.10824079485736314623 36163851 0.1110004977636494430 36455066 0.11113368549436481096 36273013 0.11464803312636280476 36280477 0.11589329746736481094 36481093 0.1209246392336273013 36481096 0.12343271106236435460 36455066 0.12750601443536314623 36488370 0.13539074166330389910 30389896 0.14520676691730389896 30389910 0.15919629057235979647 26032038 0.17888563049917818175 36314621 0.17929292929317818177 36314623 0.18546195652236280477 36280476 0.19546313799636280476 36163849 0.21990585070636280477 36163851 0.23969754253336481093 36481094 0.24720021852
思考
从大量的数据中抽取的关联规则特别少,原因是同一session id下载的歌曲很多都是只有一首歌。是不是应该考虑不以session作为单位进行频繁项集的抽取,而是以用户作为单位进行抽取。而且,有些id对应同一首歌,这样同样会被抽取为关联度较大的规则,这是没有意义的,作为噪声需要避免。如果是处理多天的数据,可能就需要多台机器并行处理了,针对此还需要稍微改进一下现在的算法。
同时,关联规则的抽取只是一个小的方面,还有很多方面可以对这些数据进行抽取,期待以后的工作能将此做的更好。
个人博客地址:
新浪微博:
欢迎读者交流讨论并提出宝贵意见。