forked from tatobari/hx711py
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathemulated_hx711.py
330 lines (223 loc) · 9.47 KB
/
emulated_hx711.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import time
import random
import math
import threading
class HX711:
def __init__(self, dout, pd_sck, gain=128):
self.PD_SCK = pd_sck
self.DOUT = dout
# Last time we've been read.
self.lastReadTime = time.time()
self.sampleRateHz = 80.0
self.resetTimeStamp = time.time()
self.sampleCount = 0
self.simulateTare = False
# Mutex for reading from the HX711, in case multiple threads in client
# software try to access get values from the class at the same time.
self.readLock = threading.Lock()
self.GAIN = 0
self.REFERENCE_UNIT = 1 # The value returned by the hx711 that corresponds to your reference unit AFTER dividing by the SCALE.
self.OFFSET = 1
self.lastVal = long(0)
self.DEBUG_PRINTING = False
self.byte_format = 'MSB'
self.bit_format = 'MSB'
self.set_gain(gain)
# Think about whether this is necessary.
time.sleep(1)
def convertToTwosComplement24bit(self, inputValue):
# HX711 has saturating logic.
if inputValue >= 0x7fffff:
return 0x7fffff
# If it's a positive value, just return it, masked with our max value.
if inputValue >= 0:
return inputValue & 0x7fffff
if inputValue < 0:
# HX711 has saturating logic.
if inputValue < -0x800000:
inputValue = -0x800000
diff = inputValue + 0x800000
return 0x800000 + diff
def convertFromTwosComplement24bit(self, inputValue):
return -(inputValue & 0x800000) + (inputValue & 0x7fffff)
def is_ready(self):
# Calculate how long we should be waiting between samples, given the
# sample rate.
sampleDelaySeconds = 1.0 / self.sampleRateHz
return time.time() >= self.lastReadTime + sampleDelaySeconds
def set_gain(self, gain):
if gain is 128:
self.GAIN = 1
elif gain is 64:
self.GAIN = 3
elif gain is 32:
self.GAIN = 2
# Read out a set of raw bytes and throw it away.
self.readRawBytes()
def get_gain(self):
if self.GAIN == 1:
return 128
if self.GAIN == 3:
return 64
if self.GAIN == 2:
return 32
# Shouldn't get here.
return 0
def readRawBytes(self):
# Wait for and get the Read Lock, incase another thread is already
# driving the virtual HX711 serial interface.
self.readLock.acquire()
# Wait until HX711 is ready for us to read a sample.
while not self.is_ready():
pass
self.lastReadTime = time.time()
# Generate a 24bit 2s complement sample for the virtual HX711.
rawSample = self.convertToTwosComplement24bit(self.generateFakeSample())
# Read three bytes of data from the HX711.
firstByte = (rawSample >> 16) & 0xFF
secondByte = (rawSample >> 8) & 0xFF
thirdByte = rawSample & 0xFF
# Release the Read Lock, now that we've finished driving the virtual HX711
# serial interface.
self.readLock.release()
# Depending on how we're configured, return an orderd list of raw byte
# values.
if self.byte_format == 'LSB':
return [thirdByte, secondByte, firstByte]
else:
return [firstByte, secondByte, thirdByte]
def read_long(self):
# Get a sample from the HX711 in the form of raw bytes.
dataBytes = self.readRawBytes()
if self.DEBUG_PRINTING:
print(dataBytes,)
# Join the raw bytes into a single 24bit 2s complement value.
twosComplementValue = ((dataBytes[0] << 16) |
(dataBytes[1] << 8) |
dataBytes[2])
if self.DEBUG_PRINTING:
print("Twos: 0x%06x" % twosComplementValue)
# Convert from 24bit twos-complement to a signed value.
signedIntValue = self.convertFromTwosComplement24bit(twosComplementValue)
# Record the latest sample value we've read.
self.lastVal = signedIntValue
# Return the sample value we've read from the HX711.
return int(signedIntValue)
def read_average(self, times=3):
# Make sure we've been asked to take a rational amount of samples.
if times <= 0:
print("HX711().read_average(): times must >= 1!! Assuming value of 1.")
times = 1
# If we're only average across one value, just read it and return it.
if times == 1:
return self.read_long()
# If we're averaging across a low amount of values, just take an
# arithmetic mean.
if times < 5:
values = int(0)
for i in range(times):
values += self.read_long()
return values / times
# If we're taking a lot of samples, we'll collect them in a list, remove
# the outliers, then take the mean of the remaining set.
valueList = []
for x in range(times):
valueList += [self.read_long()]
valueList.sort()
# We'll be trimming 20% of outlier samples from top and bottom of collected set.
trimAmount = int(len(valueList) * 0.2)
# Trim the edge case values.
valueList = valueList[trimAmount:-trimAmount]
# Return the mean of remaining samples.
return sum(valueList) / len(valueList)
def get_value(self, times=3):
return self.read_average(times) - self.OFFSET
def get_weight(self, times=3):
value = self.get_value(times)
value = value / self.REFERENCE_UNIT
return value
def tare(self, times=15):
# If we aren't simulating Taring because it takes too long, just skip it.
if not self.simulateTare:
return 0
# Backup REFERENCE_UNIT value
reference_unit = self.REFERENCE_UNIT
self.set_reference_unit(1)
value = self.read_average(times)
if self.DEBUG_PRINTING:
print("Tare value:", value)
self.set_offset(value)
# Restore the reference unit, now that we've got our offset.
self.set_reference_unit(reference_unit)
return value;
def set_reading_format(self, byte_format="LSB", bit_format="MSB"):
if byte_format == "LSB":
self.byte_format = byte_format
elif byte_format == "MSB":
self.byte_format = byte_format
else:
print("Unrecognised byte_format: \"%s\"" % byte_format)
if bit_format == "LSB":
self.bit_format = bit_format
elif bit_format == "MSB":
self.bit_format = bit_format
else:
print("Unrecognised bit_format: \"%s\"" % bit_format)
def set_offset(self, offset):
self.OFFSET = offset
def get_offset(self):
return self.OFFSET
def set_reference_unit(self, reference_unit):
# Make sure we aren't asked to use an invalid reference unit.
if reference_unit == 0:
print("HX711().set_reference_unit(): Can't use 0 as a reference unit!!")
return
self.REFERENCE_UNIT = reference_unit
def power_down(self):
# Wait for and get the Read Lock, incase another thread is already
# driving the HX711 serial interface.
self.readLock.acquire()
# Wait 100us for the virtual HX711 to power down.
time.sleep(0.0001)
# Release the Read Lock, now that we've finished driving the HX711
# serial interface.
self.readLock.release()
def power_up(self):
# Wait for and get the Read Lock, incase another thread is already
# driving the HX711 serial interface.
self.readLock.acquire()
# Wait 100 us for the virtual HX711 to power back up.
time.sleep(0.0001)
# Release the Read Lock, now that we've finished driving the HX711
# serial interface.
self.readLock.release()
# HX711 will now be defaulted to Channel A with gain of 128. If this
# isn't what client software has requested from us, take a sample and
# throw it away, so that next sample from the HX711 will be from the
# correct channel/gain.
if self.get_gain() != 128:
self.readRawBytes()
def reset(self):
# self.power_down()
# self.power_up()
# Mark time when we were reset. We'll use this for sample generation.
self.resetTimeStamp = time.time()
def generateFakeSample(self):
sampleTimeStamp = time.time() - self.resetTimeStamp
noiseScale = 1.0
noiseValue = random.randrange(-(noiseScale * 1000),(noiseScale * 1000)) / 1000.0
sample = math.sin(math.radians(sampleTimeStamp * 20)) * 72.0
self.sampleCount += 1
if sample < 0.0:
sample = -sample
sample += noiseValue
BIG_ERROR_SAMPLE_FREQUENCY = 142
###BIG_ERROR_SAMPLE_FREQUENCY = 15
BIG_ERROR_SAMPLES = [0.0, 40.0, 70.0, 150.0, 280.0, 580.0]
if random.randrange(0, BIG_ERROR_SAMPLE_FREQUENCY) == 0:
sample = random.sample(BIG_ERROR_SAMPLES, 1)[0]
print("Sample %d: Injecting %f as a random bad sample." % (self.sampleCount, sample))
sample *= 1000
sample *= self.REFERENCE_UNIT
return int(sample)
# EOF - emulated_hx711.py