88import threading
99import socket
1010
11+
1112class ClosableSSEClient (SSEClient ):
13+
1214 def __init__ (self , * args , ** kwargs ):
1315 self .should_connect = True
1416 super (ClosableSSEClient , self ).__init__ (* args , ** kwargs )
17+
1518 def _connect (self ):
1619 if self .should_connect :
1720 super (ClosableSSEClient , self )._connect ()
1821 else :
1922 raise StopIteration ()
23+
2024 def close (self ):
2125 self .should_connect = False
2226 self .retry = 0
23- # HACK: dig through the sseclient library to the requests library down to the underlying socket.
24- # then close that to raise an exception to get out of streaming. I should probably file an issue w/ the
25- # requests library to make this easier
2627 try :
2728 self .resp .raw ._fp .fp ._sock .shutdown (socket .SHUT_RDWR )
2829 self .resp .raw ._fp .fp ._sock .close ()
2930 except AttributeError :
3031 pass
3132
33+
3234class RemoteThread (threading .Thread ):
35+
3336 def __init__ (self , URL , function ):
3437 self .function = function
3538 self .URL = URL
3639 super (RemoteThread , self ).__init__ ()
40+
3741 def run (self ):
3842 try :
3943 self .sse = ClosableSSEClient (self .URL )
@@ -46,20 +50,23 @@ def run(self):
4650 pass # this can happen when we close the stream
4751 except KeyboardInterrupt :
4852 self .close ()
53+
4954 def close (self ):
5055 if self .sse :
5156 self .sse .close ()
5257
58+
5359def firebaseURL (URL ):
5460 if '.firebaseio.com' not in URL .lower ():
5561 if '.json' == URL [- 5 :]:
5662 URL = URL [:- 5 ]
5763 if '/' in URL :
5864 if '/' == URL [- 1 ]:
5965 URL = URL [:- 1 ]
60- URL = 'https://' + URL .split ('/' )[0 ]+ '.firebaseio.com/' + URL .split ('/' ,1 )[1 ] + '.json'
66+ URL = 'https://' + \
67+ URL .split ('/' )[0 ] + '.firebaseio.com/' + URL .split ('/' , 1 )[1 ] + '.json'
6168 else :
62- URL = 'https://' + URL + '.firebaseio.com/.json'
69+ URL = 'https://' + URL + '.firebaseio.com/.json'
6370 return URL
6471
6572 if 'http://' in URL :
@@ -73,36 +80,51 @@ def firebaseURL(URL):
7380 URL = URL + '.json'
7481 return URL
7582
83+
7684class subscriber :
85+
7786 def __init__ (self , URL , function ):
7887 self .remote_thread = RemoteThread (firebaseURL (URL ), function )
88+
7989 def start (self ):
8090 self .remote_thread .start ()
91+
8192 def stop (self ):
8293 self .remote_thread .close ()
8394 self .remote_thread .join ()
95+
8496 def wait (self ):
8597 self .remote_thread .join ()
8698
99+
100+ class FirebaseException (Exception ):
101+ pass
102+
103+
87104def put (URL , msg ):
88105 to_post = json .dumps (msg )
89- response = requests .put (firebaseURL (URL ), data = to_post )
106+ response = requests .put (firebaseURL (URL ), data = to_post )
90107 if response .status_code != 200 :
91- raise Exception (response .text )
108+ raise FirebaseException (response .text )
109+
92110
93111def patch (URL , msg ):
94112 to_post = json .dumps (msg )
95- response = requests .patch (firebaseURL (URL ), data = to_post )
113+ response = requests .patch (firebaseURL (URL ), data = to_post )
96114 if response .status_code != 200 :
97- raise Exception (response .text )
115+ raise FirebaseException (response .text )
98116
99117
100- '''
101- Yuck, I don't want to write documentation for this :p
102- def push(URL, msg):
103- to_post = json.dumps(msg)
104- response = requests.post(firebaseURL(URL), data = to_post)
118+ def get (URL ):
119+ response = requests .get (firebaseURL (URL ))
105120 if response .status_code != 200 :
106- raise Exception(response.text)
107- '''
121+ raise FirebaseException (response .text )
122+ return json .loads (response .text )
123+
108124
125+ # Yuck, I don't want to write documentation for this :p
126+ #def push(URL, msg):
127+ # to_post = json.dumps(msg)
128+ # response = requests.post(firebaseURL(URL), data=to_post)
129+ # if response.status_code != 200:
130+ # raise Exception(response.text)
0 commit comments