-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimeout.py
119 lines (88 loc) · 2.41 KB
/
timeout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import time
from itertools import count
from multiprocessing import Process
import requests
import threading
import json
from MakeTweet import trackTweet
from CheckPrices import start_trecking
class perpetualTimer():
def __init__(self,t,hFunction,args):
self.t=t
self.args = args
print("thi is is t", self.t)
self.hFunction = hFunction
print('this is function passed', self.hFunction)
self.thread = threading.Timer(self.t,self.handle_function)
def handle_function(self, *args):
print("starting passed function....")
print("these are args to funtion", self.args)
try:
self.hFunction(*self.args)
self.thread = threading.Timer(self.t,self.handle_function, args=self.args)
except:
self.hFunction()
self.thread = threading.Timer(self.t, self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def Callback():
# t = threading.Timer(3, Callback)
r = requests.get('http://akshaykaluchascriptapp.herokuapp.com/', auth=('user', 'pass'))
r.headers['content-type']
data = r.json()
# type = data['Type']
print(data)
print(threading.activeCount(), "threads active after callback")
t = None
tracker = None
# userTweet = userTweet
# user = "lifeofakshy"
# count = 2
def TweetTracker(user, userTweet):
try:
likedTweets = trackTweet(user, userTweet)
return likedTweets
except:
error = "an error occured"
return error
def startTweetTracker(user, userTweet):
global tracker
tracker = perpetualTimer(20,
TweetTracker, args=(user, userTweet))
return tracker.start()
def stopTweetTracker():
global tracker
tracker.cancel()
del tracker
print(tracker)
# startTweetTracker(user=user, userTweet=userTweet)
# WAIT_TIME_SECONDS = 5
# ticker = threading.Event()
# while not ticker.wait(WAIT_TIME_SECONDS):
# Callback()
crawling = None
def startPCSearch():
print("Amazon crawling starting......")
global crawling
crawling = perpetualTimer(6, start_trecking, args=None)
crawling.start()
def cancelPCSearch():
global crawling
crawling.cancel()
del crawling
def startThread():
global t
t = perpetualTimer(300,
Callback, args=None)
t.start()
# startThread()
def cancelThread():
global t
t.cancel()
del t
print(t)
if __name__ == "__main__":
pass