-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchaturbate-estim-2b.py
executable file
·149 lines (123 loc) · 4.14 KB
/
chaturbate-estim-2b.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Chaturbate E-stim 2B controller, Connects Chaturbate chat to E-Stim systems 2B
# Copyright (C) 2021 cb-stimmer
import asyncio
import functools
import json
import os
import random
import re
import string
import sys
import threading
import time
import traceback
import urllib.request
from datetime import datetime
from numbers import Number
from queue import Empty, SimpleQueue
import warnings
from Chaturbate import Chaturbate
from Tip import Tip
from estim2bCom import estim2bCom
import requests
import websockets
from pipe import Pipe
# local versions of libraries
sys.path.insert(0, 'buttplug')
sys.path.insert(0, 'python_readchar\\readchar')
from readchar import key as KEY
from readchar import readkey
settings = dict()
def load_settings():
with open("settings.json", "r") as read_file:
global settings
settings = json.load(read_file)
def main():
load_settings()
broadcaster = sys.argv[1] if len(sys.argv) >= 2 else ''
pipe = Pipe()
pipe_comm = pipe.pipe_a
pipe_watcher = pipe.pipe_b
comm_thread = threading.Thread(
target=communicator_runner, args=(pipe_comm, broadcaster))
#target=comm_dummy, args=(pipe_comm, broadcaster))
watcher_thread = threading.Thread(
target=chat_watcher_runner, args=(pipe_watcher, broadcaster))
# target=comm_test, args=(pipe_watcher, broadcaster))
comm_thread.start()
watcher_thread.start()
print("started threads")
print("controls: [q]uit\t[a]/[z] delay\t[c]hange broadcaster\t[l]evels reload")
try:
while True:
inp = readkey()
if inp == 'q':
raise Exception('Quit pressed')
elif inp == 'a':
pipe_watcher.put(('delay', 0.5))
elif inp == 'z':
pipe_watcher.put(('delay', -0.5))
elif inp == 'c':
new_broadcaster = input('Enter new broadcaster: ')
pipe_comm.put(('broadcaster', new_broadcaster))
pipe_watcher.put(('broadcaster', new_broadcaster))
elif inp == 'l':
pipe_comm.put(('levels_reload'))
watcher_thread.join()
comm_thread.join()
except Exception as ex:
print('main thread error', ex)
pipe_comm.put(Exception('main thread error'))
pipe_watcher.put(Exception('main thread error'))
finally:
print("finally")
def communicator_runner(pipe, broadcaster):
estim = estim2bCom(settings["serialPort"],settings["tipLevels"],settings["specialTips"],settings["mode"],settings["power"])
asyncio.run(estim.communicator(pipe, broadcaster))
print('communicator_runner finished')
def comm_test(tips_queue, broadcaster):
try:
while True:
print('test: 666 wave 10s')
tips_queue.put(Tip(666, time.time() - 6))
time.sleep(12)
print('test: 777 pulse 10s')
tips_queue.put(Tip(777, time.time() - 6))
time.sleep(12)
print('test: 888 earthquake 10s')
tips_queue.put(Tip(888, time.time() - 6))
time.sleep(12)
print('test: 999 fireworks 10s')
tips_queue.put(Tip(999, time.time() - 6))
time.sleep(15)
print('test: nothing 10s')
time.sleep(10)
except Exception as ex:
print('comm_test error')
print(traceback.format_exc())
finally:
print('comm_test done')
def chat_watcher_runner(tips_queue, broadcaster):
cb = Chaturbate()
asyncio.run(cb.chat_watcher(tips_queue, broadcaster))
print('chat_watcher_runner finished')
def comm_dummy(pipe, broadcaster):
try:
while True:
try:
el = pipe.get()
print('recv' + str(el) + '\r\n')
if type(el) == Tip:
print("type is tip")
print(type(el))
if type(el) == Exception:
raise el
except Empty:
time.sleep(10)
except Exception:
print('comm_dummy error')
print(traceback.format_exc())
if __name__ == '__main__':
main()