work tools – ip log analysis and record
收录一个工作中使用的日志分析脚本,使用python写成,简单明了,直接上代码:
#! /usr/bin/env python # -*- coding: utf8 import MySQLdb import datetime import sys, time import traceback import devintool as t from threading import Thread import threading import Queue as q THREAD_NUM = 100 BATCH_SIZE = 1000 clients=[] class Conn(): def __init__(self, host, port, db="fb", user="", passwd="", charset="utf8"): self.host=host self.port=port self.db=db self.user=user self.passwd=passwd self.charset=charset self.is_open=False self.__conn = None def open(self): self.__conn = MySQLdb.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.db, charset=self.charset) self.is_open=True return self.__conn def close(self): self.__conn.close() self.is_open=False def cursor(self): return self.__conn.cursor() stdoutMutex = threading.Lock() class mysqlThread(Thread): #def __init__(self, host1, port1, db1, host2, port2, db2, queue): def __init__(self, host1, port1, db1, queue): Thread.__init__(self) self.queue=queue self.conn1=Conn(host=host1,port=port1,db=db1) self.cursor1=self.conn1.open().cursor() self.cursor1.execute('set autocommit=1') ''' self.cursor2=self.conn2.open().cursor() self.conn2=Conn(host=host2,port=port2,db=db2) self.cursor2.execute('set autocommit=1') ''' self._stop = threading.Event() def run(self): print "run" while True: #try: sql = self.queue.get() if not self.cursor1: self.cursor1 = self.conn1.open().cursor() self.cursor1.execute('set autocommit=1') #if not self.cursor2: # self.cursor2 = self.conn2.open().cursor() # self.cursor2.execute('set autocommit=1') self.cursor1.execute(sql) #self.cursor2.execute(sql) self.queue.task_done() ''' commit_cnt+=1 print commit_cnt if commit_cnt%10000==0: stdoutMutex.acquire() print datetime.datetime.now() print commit_cnt stdoutMutex.release() ''' #except Exception,e: # print e class iplogThread(Thread): def __init__(self, queue, date): Thread.__init__(self) self.queue = queue self.date = date self.conn = Conn(host="log1.kdb.d.kaixin.com",port=3306,db="fb") self.cursor = self.conn.open().cursor() def run(self): user_cnt=0 id_begin=0 while(1): print datetime.datetime.now() user_set = self.get_user_ids(self.cursor, id_begin, BATCH_SIZE) if len(user_set)==0: PUT_DONE = 1 break user_cnt+=len(user_set) print id_begin id_begin=int(user_set[-1][0]) self.__handle(user_set) print "%d, %d"%(len(user_set), user_cnt) print datetime.datetime.now() if len(user_set)<BATCH_SIZE: PUT_DONE = 1 break time.sleep(300) def __handle(self, user_set): user_set_type0=[] user_set_type1=[] user_set_type3=[] user_set_type_=[] user_set_type4=[] user_set_type__=[] keyMap={"user_set_type0":"web", "user_set_type1":"wap", "user_set_type3":"im", "user_set_type_":"client", "user_set_type__":"others", "user_set_type4": "im_click"} for user in user_set: type = '__' if user[1] in (0, 1, 3, 4): type = str(user[1]) elif user[1] in clients: type = "_" else: type = '__' #type = ((user[1]>=0 and user[1]<=3) and [str(user[1])] or ["_"])[0] locals()["user_set_type%s" % (type,)].append(user[0]) for type in ("0", "1", "3", "_", "4", "__"): listName = "user_set_type%s" % (type,) if len(locals()[listName])>0: str1 = ",".join("(%d, 1<<%d, 1<<%d)"%(id, self.date.day-1, self.date.day-1) for id in locals()[listName]) str2 = ",".join("%d"%id for id in locals()[listName]) str1 = str1.rstrip(',') str2 = str2.rstrip(',') sql = "insert ignore into user_login_%s (id, login_days, %s) values %s" %(self.date.strftime('%Y%m'),keyMap[listName],str1) #print sql self.queue.put(sql) #bit record log days sql = "update user_login_%s set login_days=login_days|1<<%d, %s=%s|1<<%d where id in (%s)"%(self.date.strftime('%Y%m'), (self.date.day-1), keyMap[listName], keyMap[listName], (self.date.day-1), str2) #print sql self.queue.put(sql) else: print "len=0:", keyMap[listName] ''' #deal with Other str1 = ",".join("(%d, 1<<%d)"%(id, self.date.day-1) for id in user_set_typeOther) str2 = ",".join("%d"%id for id in user_set_typeOther) str1 = str1.rstrip(',') str2 = str2.rstrip(',') sql = "insert ignore into user_login_%s (id, login_days) values %s" %(self.date.strftime('%Y%m'), str1) self.queue.put(sql) sql = "update user_login_%s set login_days=login_days|1<<%d where id in (%s)"%(self.date.strftime('%Y%m'), (self.date.day-1), str2) self.queue.put(sql) ''' def get_user_ids(self, cursor, id_begin=0, limit=10000): sql="select id, type from iplog_typeid_%s where id>%d order by id limit %d" % (self.date.strftime("%Y%m%d"), id_begin, limit) return t.get_set_cur(sql, cursor) def getClients(): sql = "select app_id from ClientDefine;" cursor = MySQLdb.connect(host="10.22.241.148", user="xiaoneilogs", passwd="Pycd8452", port=3306, db="xnstat", charset="utf8").cursor() rs = t.get_set_cur(sql, cursor) return [l[0] for l in rs] if __name__ == "__main__": #try: if len(sys.argv)>1: date_str = sys.argv[1] print date_str date = datetime.datetime(int(date_str.split('-')[0]),int(date_str.split('-')[1]),int(date_str.split('-')[2]) ) print date #x = raw_input('ok') else: date = (datetime.datetime.today() - datetime.timedelta(days=1)).date() print date clients=getClients() print "clients:n", clients conn1 = Conn(host="10.22.200.224", db="ugc_xn", port=3306) conn2 = Conn(host="123.125.45.215", db="user_login", port=3306) #for conn in (conn1,conn2): for conn in (conn1, ): sql = "create table if not exists user_login_%s like user_login; "%date.strftime('%Y%m') print sql conn.open() conn.cursor().execute(sql) conn.close() queue = q.Queue(THREAD_NUM) for i in range(THREAD_NUM): #th = mysqlThread(host1='10.22.200.224', port1=3306, db1='ugc_xn', host2='123.125.45.215',port2=3306, db2='user_login', queue=queue) th = mysqlThread(host1='10.22.200.224', port1=3306, db1='ugc_xn', queue=queue) th.setDaemon(True) th.start() iplog = iplogThread(queue, date) iplog.start() queue.join() #except Exception,e: # print "Exception happened : ",e # t.putout_except(e,os.path.dirname(os.path.realpath(__file__)))
分类: 技术收集