自学习

本节例程的位置在 百度云盘资料\野火K210 AI视觉相机\1-教程文档_例程源码\例程\10-KPU\self_learn_classifier\self_learning.py

介绍

机器自动学习,通过拍摄5张物品照片来记录该物品的特征,当特征点匹配上了即可识别出该物体,可用于对特定物品的识别

例程

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
'''
此示例展示利用KPU神经网络,进行自分类学习的功能。
它共有三种模式:初始化、训练、分类。
操作指南:
1. 初始化模式:按下boot键,进入初始化模式,此时屏幕显示“Init”。
2. 训练模式:按下boot键,进入训练模式,此时屏幕显示“Train object 1”。
    按下boot键,拍摄一张图片,屏幕显示“Train object 1\r\n\r\nBoot key to take #P2”。
    按下boot键,拍摄一张图片,屏幕显示“Train object 1\r\n\r\nBoot key to take #P3”。
3. 分类模式:按下boot键,进入分类模式,此时屏幕显示“Classification”。

'''
import gc
import lcd
import sensor
import time
from maix import GPIO
from maix import KPU
from board import board_info
from fpioa_manager import fm
from image import Image


####################################################################################################################
class STATE(object):
    IDLE = 0
    INIT = 1
    TRAIN_CLASS_1 = 2
    TRAIN_CLASS_2 = 3
    TRAIN_CLASS_3 = 4
    CLASSIFY = 5
    STATE_MAX = 6


class EVENT(object):
    POWER_ON = 0            # virtual event, 用于上电初始化
    BOOT_KEY = 1            # boot键按下
    BOOT_KEY_LONG_PRESS = 2 # boot键长按约3秒
    EVENT_NEXT_MODE = 3     # virtual event, 用于切换到下一个模式
    EVENT_MAX = 4


class StateMachine(object):
    def __init__(self, state_handlers, event_handlers, transitions):
        self.previous_state = STATE.IDLE
        self.current_state = STATE.IDLE
        self.state_handlers = state_handlers
        self.event_handlers = event_handlers
        self.transitions = transitions

    def reset(self):
        '''
        重置状态机
        :return:
        '''
        self.previous_state = STATE.IDLE
        self.current_state = STATE.IDLE

    def get_next_state(self, cur_state, cur_event):
        '''
        根据当着状态和event, 从transitions表里查找出下一个状态
        :param cur_state:
        :param cur_event:
        :return:
            next_state: 下一状态
            None: 找不到对应状态
        '''
        for cur, next, event in self.transitions:
            if cur == cur_state and event == cur_event:
                return next
        return None

    # execute action before enter current state
    def enter_state_action(self, state, event):
        '''
        执行当前状态对应的进入action
        :param state: 当前状态
        :param event: 当前event
        :return:
        '''
        try:
            if self.state_handlers[state][0]:
                self.state_handlers[state][0](self, state, event)
        except Exception as e:
            print(e)

    # execute action of current state
    def execute_state_action(self, state, event):
        '''
        执行当前状态action函数
        :param state:   当前状态
        :param event:   当前event
        :return:
        '''
        try:
            if self.state_handlers[state][1]:
                self.state_handlers[state][1](self, state, event)
        except Exception as e:
            print(e)

    # execute action when exit state
    def exit_state_action(self, state, event):
        '''
        执行当前状态的退出action
        :param state: 当前状态
        :param event: 当前event
        :return:
        '''
        try:
            if self.state_handlers[state][2]:
                self.state_handlers[state][2](self, state, event)
        except Exception as e:
            print(e)

    def emit_event(self, event):
        '''
        发送event。根据当前状态和event,查找下一个状态,然后执行对应的action。
        :param event: 要发送的event
        :return:
        '''
        next_state = self.get_next_state(self.current_state, event)

        # execute enter function and exit function when state changed
        if next_state != None and next_state != self.current_state:
            self.exit_state_action(self.previous_state, event)
            self.previous_state = self.current_state
            self.current_state = next_state
            self.enter_state_action(self.current_state, event)
            print("event valid: {}, cur: {}, next: {}".format(event, self.current_state, next_state))

        # call state action for each event
        self.execute_state_action(self.current_state, event)

    def engine(self):
        '''
        状态机引擎,用于执行状态机
        :return:
        '''
        pass

def restart(self):
    '''
    重新启动状态机程序
    :return:
    '''
    global features
    self.reset()
    features.clear()
    self.emit_event(EVENT.POWER_ON)


def enter_state_idle(self, state, event):
    print("enter state: idle")


def exit_state_idle(self, state, event):
    print("exit state: idle")


def state_idle(self, state, event):
    global central_msg
    print("current state: idle")
    central_msg = None


def enter_state_init(self, state, event):
    global img_init
    print("enter state: init")
    img_init = Image(size=(lcd.width(), lcd.height()))


def exit_state_init(self, state, event):
    print("exit state: init")
    del img_init


def state_init(self, state, event):
    print("current state: init, event: {}".format(event))

    # switch to next state when boot key is pressed
    if event == EVENT.BOOT_KEY:
        self.emit_event(EVENT.EVENT_NEXT_MODE)
    elif event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return


def enter_state_train_class_1(self, state, event):
    print("enter state: train class 1")
    global train_pic_cnt, central_msg, bottom_msg
    train_pic_cnt = 0
    central_msg = "Train class 1"
    bottom_msg = "Take pictures of 1st class"


def exit_state_train_class_1(self, state, event):
    print("exit state: train class 1")


def state_train_class_1(self, state, event):
    global kpu, central_msg, bottom_msg, features, train_pic_cnt
    global state_machine
    print("current state: class 1")

    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return

    if train_pic_cnt == 0:  # 0 is used for prompt only
        features.append([])
        train_pic_cnt += 1
    elif train_pic_cnt <= max_train_pic:
        central_msg = None
        img = sensor.snapshot()
        feature = kpu.run_with_output(img, get_feature=True)
        features[0].append(feature)
        bottom_msg = "Class 1: #P{}".format(train_pic_cnt)
        train_pic_cnt += 1
    else:
        state_machine.emit_event(EVENT.EVENT_NEXT_MODE)


def enter_state_train_class_2(self, state, event):
    print("enter state: train class 2")
    global train_pic_cnt, central_msg, bottom_msg
    train_pic_cnt = 0
    central_msg = "Train class 2"
    bottom_msg = "Change to 2nd class please"


def exit_state_train_class_2(self, state, event):
    print("exit state: train class 2")


def state_train_class_2(self, state, event):
    global kpu, central_msg, bottom_msg, features, train_pic_cnt
    global state_machine
    print("current state: class 2")

    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return

    if train_pic_cnt == 0:
        features.append([])
        train_pic_cnt += 1
    elif train_pic_cnt <= max_train_pic:
        central_msg = None
        img = sensor.snapshot()
        feature = kpu.run_with_output(img, get_feature=True)
        features[1].append(feature)
        bottom_msg = "Class 2: #P{}".format(train_pic_cnt)
        train_pic_cnt += 1
    else:
        state_machine.emit_event(EVENT.EVENT_NEXT_MODE)


def enter_state_train_class_3(self, state, event):
    print("enter state: train class 3")
    global train_pic_cnt, central_msg, bottom_msg
    train_pic_cnt = 0
    central_msg = "Train class 3"
    bottom_msg = "Change to 3rd class please"


def exit_state_train_class_3(self, state, event):
    print("exit state: train class 3")


def state_train_class_3(self, state, event):
    global kpu, central_msg, bottom_msg, features, train_pic_cnt
    global state_machine
    print("current state: class 3")

    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return

    if train_pic_cnt == 0:
        features.append([])
        train_pic_cnt += 1
    elif train_pic_cnt <= max_train_pic:
        central_msg = None
        img = sensor.snapshot()
        feature = kpu.run_with_output(img, get_feature=True)
        features[2].append(feature)
        bottom_msg = "Class 3: #P{}".format(train_pic_cnt)
        train_pic_cnt += 1
    else:
        state_machine.emit_event(EVENT.EVENT_NEXT_MODE)


def enter_state_classify(self, state, event):
    global central_msg, bottom_msg
    print("enter state: classify")
    central_msg = "Classification"
    bottom_msg = "Training complete! Start classification"



def exit_state_classify(self, state, event):
    print("exit state: classify")


def state_classify(self, state, event):
    global central_msg, bottom_msg
    print("current state: classify, {}, {}".format(state, event))
    if event == EVENT.BOOT_KEY:
        central_msg = None
    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return



def event_power_on(self, value=None):
    print("emit event: power_on")


def event_press_boot_key(self, value=None):
    global state_machine
    print("emit event: boot_key")


def event_long_press_boot_key(self, value=None):
    global state_machine
    print("emit event: boot_key_long_press")


# state action table format:
#   state: [enter_state_handler, execute_state_handler, exit_state_handler]
state_handlers = {
    STATE.IDLE: [enter_state_idle, state_idle, exit_state_idle],
    STATE.INIT: [enter_state_init, state_init, exit_state_init],
    STATE.TRAIN_CLASS_1: [enter_state_train_class_1, state_train_class_1, exit_state_train_class_1],
    STATE.TRAIN_CLASS_2: [enter_state_train_class_2, state_train_class_2, exit_state_train_class_2],
    STATE.TRAIN_CLASS_3: [enter_state_train_class_3, state_train_class_3, exit_state_train_class_3],
    STATE.CLASSIFY: [enter_state_classify, state_classify, exit_state_classify]
}

# event action table, can be enabled while needed
event_handlers = {
    EVENT.POWER_ON: event_power_on,
    EVENT.BOOT_KEY: event_press_boot_key,
    EVENT.BOOT_KEY_LONG_PRESS: event_long_press_boot_key
}

# Transition table
transitions = [
    [STATE.IDLE, STATE.INIT, EVENT.POWER_ON],
    [STATE.INIT, STATE.TRAIN_CLASS_1, EVENT.EVENT_NEXT_MODE],
    [STATE.TRAIN_CLASS_1, STATE.TRAIN_CLASS_2, EVENT.EVENT_NEXT_MODE],
    [STATE.TRAIN_CLASS_2, STATE.TRAIN_CLASS_3, EVENT.EVENT_NEXT_MODE],
    [STATE.TRAIN_CLASS_3, STATE.CLASSIFY, EVENT.EVENT_NEXT_MODE]
]


####################################################################################################################
class Button(object):
    DEBOUNCE_THRESHOLD = 30  # 消抖阈值
    LONG_PRESS_THRESHOLD = 1000  # 长按阈值
    # Internal  key states
    IDLE = 0
    DEBOUNCE = 1
    SHORT_PRESS = 2
    LONG_PRESS = 3

    def __init__(self, state_machine):
        self._state = Button.IDLE
        self._key_ticks = 0
        self._pre_key_state = 1
        self.SHORT_PRESS_BUF = None
        self.st = state_machine

    def reset(self):
        self._state = Button.IDLE
        self._key_ticks = 0
        self._pre_key_state = 1
        self.SHORT_PRESS_BUF = None

    def key_up(self, delta):
        # print("up:{}".format(delta))
        # key up时,有缓存的key信息就发出去,没有的话直接复位状态
        if self.SHORT_PRESS_BUF:
            self.st.emit_event(self.SHORT_PRESS_BUF)
        self.reset()

    def key_down(self, delta):
        # print("dn:{},t:{}".format(delta, self._key_ticks))
        if self._state == Button.IDLE:
            self._key_ticks += delta
            if self._key_ticks > Button.DEBOUNCE_THRESHOLD:
                # main loop period过大时,会直接跳过去抖阶段
                self._state = Button.SHORT_PRESS
                self.SHORT_PRESS_BUF = EVENT.BOOT_KEY  # key_up 时发送
            else:
                self._state = Button.DEBOUNCE
        elif self._state == Button.DEBOUNCE:
            self._key_ticks += delta
            if self._key_ticks > Button.DEBOUNCE_THRESHOLD:
                self._state = Button.SHORT_PRESS
                self.SHORT_PRESS_BUF = EVENT.BOOT_KEY  # key_up 时发送
        elif self._state == Button.SHORT_PRESS:
            self._key_ticks += delta
            if self._key_ticks > Button.LONG_PRESS_THRESHOLD:
                self._state = Button.LONG_PRESS
                self.SHORT_PRESS_BUF = None  # 检测到长按,将之前可能存在的短按buffer清除,以防发两个key event出去
                self.st.emit_event(EVENT.BOOT_KEY_LONG_PRESS)
        elif self._state == Button.LONG_PRESS:
            self._key_ticks += delta
            # 最迟 LONG_PRESS 发出信号,再以后就忽略,不需要处理。key_up时再退出状态机。
            pass
        else:
            pass


####################################################################################################################
# 未启用 state machine 的主循环,所以这里将需要循环执行的函数放在while True里
def loop_init():
    global lcd, img_init
    if state_machine.current_state != STATE.INIT:
        return

    img_init.draw_rectangle(0, 0, lcd.width(), lcd.height(), color=(0, 0, 255), fill=True, thickness=2)
    img_init.draw_string(65, 90, "Self Learning Demo", color=(255, 255, 255), scale=2)
    img_init.draw_string(5, 210, "short press:   next", color=(255, 255, 255), scale=1)
    img_init.draw_string(5, 225, "long press:      restart", color=(255, 255, 255), scale=1)
    lcd.display(img_init)


def loop_capture():
    global central_msg, bottom_msg
    img = sensor.snapshot()
    if central_msg:
        img.draw_rectangle(0, 90, lcd.width(), 22, color=(0, 0, 255), fill=True, thickness=2)
        img.draw_string(55, 90, central_msg, color=(255, 255, 255), scale=2)
    if bottom_msg:
        img.draw_string(5, 208, bottom_msg, color=(0, 0, 255), scale=1)
    lcd.display(img)


def loop_classify():
    global central_msg, bottom_msg
    img = sensor.snapshot()

    scores = []
    feature = kpu.run_with_output(img, get_feature=True)
    high = 0
    index = 0
    for j in range(len(features)):
        for f in features[j]:
            score = kpu.feature_compare(f, feature)
            if score > high:
                high = score
                index = j
    if high > THRESHOLD:
        bottom_msg = "class:{},score:{:2.1f}".format(index + 1, high)
    else:
        bottom_msg = None

    # display info
    if central_msg:
        print("central_msg:{}".format(central_msg))
        img.draw_rectangle(0, 90, lcd.width(), 22, color=(0, 255, 0), fill=True, thickness=2)
        img.draw_string(55, 90, central_msg, color=(255, 255, 255), scale=2)
    if bottom_msg:
        print("bottom_msg:{}".format(bottom_msg))
        img.draw_string(5, 208, bottom_msg, color=(0, 255, 0), scale=1)
    lcd.display(img)

####################################################################################################################
# main loop
features = []
THRESHOLD = 98.5    # 比对阈值,越大越严格
train_pic_cnt = 0   # 当前分类已训练图片数量
max_train_pic = 5   # 每个类别最多训练图片数量

central_msg = None  # 屏幕中间显示的信息
bottom_msg = None   # 屏幕底部显示的信息

fm.register(board_info.BOOT_KEY, fm.fpioa.GPIOHS0)
boot_gpio = GPIO(GPIO.GPIOHS0, GPIO.IN)

lcd.init()
sensor.reset(dual_buff=True)  # Reset and initialize the sensor. It will
# run automatically, call sensor.run(0) to stop
sensor.set_pixformat(sensor.RGB565)  # Set pixel format to RGB565 (or GRAYSCALE)
sensor.set_framesize(sensor.QVGA)  # Set frame size to QVGA (320x240)
sensor.set_windowing((224, 224))
sensor.set_vflip(1)
sensor.skip_frames(time=500)  # Wait for settings take effect.

kpu = KPU()
print("ready load model")
kpu.load_kmodel("/sd/KPU/self_learn_classifier/mb-0.25.kmodel")

state_machine = StateMachine(state_handlers, event_handlers, transitions)
state_machine.emit_event(EVENT.POWER_ON)

btn_ticks_prev = time.ticks_ms()
boot_btn = Button(state_machine)
while True:
    gc.collect()

    # 计算boot key被压下或弹起的时间,用于消抖和长按检测, 并且向state machine发送事件
    btn_ticks_cur = time.ticks_ms()
    delta = time.ticks_diff(btn_ticks_cur, btn_ticks_prev)
    btn_ticks_prev = btn_ticks_cur
    if boot_gpio.value() == 0:
        boot_btn.key_down(delta)
    else:
        boot_btn.key_up(delta)

    # 未启用 state machine 的主循环,所以这里将需要循环执行的函数放在while True里
    if state_machine.current_state == STATE.INIT:
        loop_init()

    elif state_machine.current_state == STATE.CLASSIFY:
        loop_classify()

    elif state_machine.current_state == STATE.TRAIN_CLASS_1 or state_machine.current_state == STATE.TRAIN_CLASS_2 \
            or state_machine.current_state == STATE.TRAIN_CLASS_3:
        loop_capture()

例程解析

1
2
3
4
5
6
7
8
9
import gc
import lcd
import sensor
import time
from maix import GPIO
from maix import KPU
from board import board_info
from fpioa_manager import fm
from image import Image
  • 导入模块:代码开始处导入了必要的模块,如gc(垃圾回收),lcd(液晶显示),sensor(传感器),time,以及maix和board相关的模块。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class STATE(object):
    IDLE = 0
    INIT = 1
    TRAIN_CLASS_1 = 2
    TRAIN_CLASS_2 = 3
    TRAIN_CLASS_3 = 4
    CLASSIFY = 5
    STATE_MAX = 6


class EVENT(object):
    POWER_ON = 0            # virtual event, 用于上电初始化
    BOOT_KEY = 1            # boot键按下
    BOOT_KEY_LONG_PRESS = 2 # boot键长按约3秒
    EVENT_NEXT_MODE = 3     # virtual event, 用于切换到下一个模式
    EVENT_MAX = 4
  • 状态和事件定义:使用STATE和EVENT类定义了状态机和事件。这些枚举类型用于表示状态机的不同状态和可能的事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class StateMachine(object):
    def __init__(self, state_handlers, event_handlers, transitions):
        self.previous_state = STATE.IDLE
        self.current_state = STATE.IDLE
        self.state_handlers = state_handlers
        self.event_handlers = event_handlers
        self.transitions = transitions

    def reset(self):
        '''
        重置状态机
        :return:
        '''
        self.previous_state = STATE.IDLE
        self.current_state = STATE.IDLE

    def get_next_state(self, cur_state, cur_event):
        '''
        根据当着状态和event, 从transitions表里查找出下一个状态
        :param cur_state:
        :param cur_event:
        :return:
            next_state: 下一状态
            None: 找不到对应状态
        '''
        for cur, next, event in self.transitions:
            if cur == cur_state and event == cur_event:
                return next
        return None

    # execute action before enter current state
    def enter_state_action(self, state, event):
        '''
        执行当前状态对应的进入action
        :param state: 当前状态
        :param event: 当前event
        :return:
        '''
        try:
            if self.state_handlers[state][0]:
                self.state_handlers[state][0](self, state, event)
        except Exception as e:
            print(e)

    # execute action of current state
    def execute_state_action(self, state, event):
        '''
        执行当前状态action函数
        :param state:   当前状态
        :param event:   当前event
        :return:
        '''
        try:
            if self.state_handlers[state][1]:
                self.state_handlers[state][1](self, state, event)
        except Exception as e:
            print(e)

    # execute action when exit state
    def exit_state_action(self, state, event):
        '''
        执行当前状态的退出action
        :param state: 当前状态
        :param event: 当前event
        :return:
        '''
        try:
            if self.state_handlers[state][2]:
                self.state_handlers[state][2](self, state, event)
        except Exception as e:
            print(e)

    def emit_event(self, event):
        '''
        发送event。根据当前状态和event,查找下一个状态,然后执行对应的action。
        :param event: 要发送的event
        :return:
        '''
        next_state = self.get_next_state(self.current_state, event)

        # execute enter function and exit function when state changed
        if next_state != None and next_state != self.current_state:
            self.exit_state_action(self.previous_state, event)
            self.previous_state = self.current_state
            self.current_state = next_state
            self.enter_state_action(self.current_state, event)
            print("event valid: {}, cur: {}, next: {}".format(event, self.current_state, next_state))

        # call state action for each event
        self.execute_state_action(self.current_state, event)

    def engine(self):
        '''
        状态机引擎,用于执行状态机
        :return:
        '''
        pass
  • StateMachine类是核心部分,它实现了状态机的所有功能。这个类有以下几个关键点:

  • __init__:初始化状态处理函数、事件处理函数和状态转换表。

  • reset:重置状态机到初始状态。

  • get_next_state:根据当前状态和事件,查找下一个状态。

  • enter_state_action、execute_state_action、exit_state_action:分别执行进入状态、执行状态中、退出状态的动作。

  • emit_event:触发事件,可能导致状态转换和动作执行。

  • engine:状态机的主循环(在这个代码片段中未实现)。

1
2
3
4
5
6
7
8
9
def restart(self):
    '''
    重新启动状态机程序
    :return:
    '''
    global features
    self.reset()
    features.clear()
    self.emit_event(EVENT.POWER_ON)
  • 重启函数:restart函数用于重启状态机,清除特征数据,并触发上电事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def enter_state_idle(self, state, event):
    print("enter state: idle")


def exit_state_idle(self, state, event):
    print("exit state: idle")


def state_idle(self, state, event):
    global central_msg
    print("current state: idle")
    central_msg = None


def enter_state_init(self, state, event):
    global img_init
    print("enter state: init")
    img_init = Image(size=(lcd.width(), lcd.height()))


def exit_state_init(self, state, event):
    print("exit state: init")
    del img_init


def state_init(self, state, event):
    print("current state: init, event: {}".format(event))

    # switch to next state when boot key is pressed
    if event == EVENT.BOOT_KEY:
        self.emit_event(EVENT.EVENT_NEXT_MODE)
    elif event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return
  • 状态处理函数:为IDLE和INIT状态定义了进入、退出和状态处理函数。例如,enter_state_idle和exit_state_idle分别在进入和退出空闲状态时打印消息,而state_idle处理在空闲状态时的事件。

  • 初始化状态处理:enter_state_init在进入初始化状态时创建一个图像对象,exit_state_init在退出时删除这个对象,state_init处理在初始化状态时的按键事件。

  • 事件处理:在state_init函数中,如果检测到BOOT键按下,状态机会触发一个事件以转换到下一个模式;如果检测到BOOT键长按,则会重启状态机。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def enter_state_train_class_1(self, state, event):
    print("enter state: train class 1")
    global train_pic_cnt, central_msg, bottom_msg
    train_pic_cnt = 0
    central_msg = "Train class 1"
    bottom_msg = "Take pictures of 1st class"


def exit_state_train_class_1(self, state, event):
    print("exit state: train class 1")


def state_train_class_1(self, state, event):
    global kpu, central_msg, bottom_msg, features, train_pic_cnt
    global state_machine
    print("current state: class 1")

    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return

    if train_pic_cnt == 0:  # 0 is used for prompt only
        features.append([])
        train_pic_cnt += 1
    elif train_pic_cnt <= max_train_pic:
        central_msg = None
        img = sensor.snapshot()
        feature = kpu.run_with_output(img, get_feature=True)
        features[0].append(feature)
        bottom_msg = "Class 1: #P{}".format(train_pic_cnt)
        train_pic_cnt += 1
    else:
        state_machine.emit_event(EVENT.EVENT_NEXT_MODE)


def enter_state_train_class_2(self, state, event):
    print("enter state: train class 2")
    global train_pic_cnt, central_msg, bottom_msg
    train_pic_cnt = 0
    central_msg = "Train class 2"
    bottom_msg = "Change to 2nd class please"


def exit_state_train_class_2(self, state, event):
    print("exit state: train class 2")


def state_train_class_2(self, state, event):
    global kpu, central_msg, bottom_msg, features, train_pic_cnt
    global state_machine
    print("current state: class 2")

    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return

    if train_pic_cnt == 0:
        features.append([])
        train_pic_cnt += 1
    elif train_pic_cnt <= max_train_pic:
        central_msg = None
        img = sensor.snapshot()
        feature = kpu.run_with_output(img, get_feature=True)
        features[1].append(feature)
        bottom_msg = "Class 2: #P{}".format(train_pic_cnt)
        train_pic_cnt += 1
    else:
        state_machine.emit_event(EVENT.EVENT_NEXT_MODE)


def enter_state_train_class_3(self, state, event):
    print("enter state: train class 3")
    global train_pic_cnt, central_msg, bottom_msg
    train_pic_cnt = 0
    central_msg = "Train class 3"
    bottom_msg = "Change to 3rd class please"


def exit_state_train_class_3(self, state, event):
    print("exit state: train class 3")


def state_train_class_3(self, state, event):
    global kpu, central_msg, bottom_msg, features, train_pic_cnt
    global state_machine
    print("current state: class 3")

    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return

    if train_pic_cnt == 0:
        features.append([])
        train_pic_cnt += 1
    elif train_pic_cnt <= max_train_pic:
        central_msg = None
        img = sensor.snapshot()
        feature = kpu.run_with_output(img, get_feature=True)
        features[2].append(feature)
        bottom_msg = "Class 3: #P{}".format(train_pic_cnt)
        train_pic_cnt += 1
    else:
        state_machine.emit_event(EVENT.EVENT_NEXT_MODE)
  • enter_state_train_class_X(X为1,2,3):这些函数在进入对应类别训练状态时被调用,重置图片计数器,并设置屏幕上显示的信息。

  • exit_state_train_class_X(X为1,2,3):这些函数在退出对应类别训练状态时被调用,目前只是打印一条退出信息。

  • state_train_class_X(X为1,2,3):这些函数处理在对应类别训练状态时的逻辑。它们检查是否长按BOOT键来重启状态机,或者在达到一定数量的训练图片后转换到下一个状态或模式。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def enter_state_classify(self, state, event):
    global central_msg, bottom_msg
    print("enter state: classify")
    central_msg = "Classification"
    bottom_msg = "Training complete! Start classification"



def exit_state_classify(self, state, event):
    print("exit state: classify")


def state_classify(self, state, event):
    global central_msg, bottom_msg
    print("current state: classify, {}, {}".format(state, event))
    if event == EVENT.BOOT_KEY:
        central_msg = None
    if event == EVENT.BOOT_KEY_LONG_PRESS:
        restart(self)
        return
  • enter_state_classify:在进入分类状态时设置屏幕显示信息。

  • exit_state_classify:在退出分类状态时被调用。

  • state_classify:处理在分类状态时的逻辑,包括对BOOT键的响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def event_power_on(self, value=None):
    print("emit event: power_on")


def event_press_boot_key(self, value=None):
    global state_machine
    print("emit event: boot_key")


def event_long_press_boot_key(self, value=None):
    global state_machine
    print("emit event: boot_key_long_press")
  • event_power_on、event_press_boot_key、event_long_press_boot_key:这些函数定义了如何响应不同的事件。例如,长按BOOT键会重启状态机。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# state action table format:
#   state: [enter_state_handler, execute_state_handler, exit_state_handler]
state_handlers = {
    STATE.IDLE: [enter_state_idle, state_idle, exit_state_idle],
    STATE.INIT: [enter_state_init, state_init, exit_state_init],
    STATE.TRAIN_CLASS_1: [enter_state_train_class_1, state_train_class_1, exit_state_train_class_1],
    STATE.TRAIN_CLASS_2: [enter_state_train_class_2, state_train_class_2, exit_state_train_class_2],
    STATE.TRAIN_CLASS_3: [enter_state_train_class_3, state_train_class_3, exit_state_train_class_3],
    STATE.CLASSIFY: [enter_state_classify, state_classify, exit_state_classify]
}

# event action table, can be enabled while needed
event_handlers = {
    EVENT.POWER_ON: event_power_on,
    EVENT.BOOT_KEY: event_press_boot_key,
    EVENT.BOOT_KEY_LONG_PRESS: event_long_press_boot_key
}

# Transition table
transitions = [
    [STATE.IDLE, STATE.INIT, EVENT.POWER_ON],
    [STATE.INIT, STATE.TRAIN_CLASS_1, EVENT.EVENT_NEXT_MODE],
    [STATE.TRAIN_CLASS_1, STATE.TRAIN_CLASS_2, EVENT.EVENT_NEXT_MODE],
    [STATE.TRAIN_CLASS_2, STATE.TRAIN_CLASS_3, EVENT.EVENT_NEXT_MODE],
    [STATE.TRAIN_CLASS_3, STATE.CLASSIFY, EVENT.EVENT_NEXT_MODE]
]
  • 状态处理表 state_handlers:这是一个字典,将每个状态映射到其对应的进入、执行和退出函数。

  • 事件处理表 event_handlers:这是另一个字典,将每个事件映射到其对应的事件处理函数。

  • 转换表 transitions:这是一个列表,定义了状态机如何从一个状态转换到另一个状态,基于发生的事件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Button(object):
    DEBOUNCE_THRESHOLD = 30  # 消抖阈值
    LONG_PRESS_THRESHOLD = 1000  # 长按阈值
    # Internal  key states
    IDLE = 0
    DEBOUNCE = 1
    SHORT_PRESS = 2
    LONG_PRESS = 3

    def __init__(self, state_machine):
        self._state = Button.IDLE
        self._key_ticks = 0
        self._pre_key_state = 1
        self.SHORT_PRESS_BUF = None
        self.st = state_machine

    def reset(self):
        self._state = Button.IDLE
        self._key_ticks = 0
        self._pre_key_state = 1
        self.SHORT_PRESS_BUF = None

    def key_up(self, delta):
        # print("up:{}".format(delta))
        # key up时,有缓存的key信息就发出去,没有的话直接复位状态
        if self.SHORT_PRESS_BUF:
            self.st.emit_event(self.SHORT_PRESS_BUF)
        self.reset()

    def key_down(self, delta):
        # print("dn:{},t:{}".format(delta, self._key_ticks))
        if self._state == Button.IDLE:
            self._key_ticks += delta
            if self._key_ticks > Button.DEBOUNCE_THRESHOLD:
                # main loop period过大时,会直接跳过去抖阶段
                self._state = Button.SHORT_PRESS
                self.SHORT_PRESS_BUF = EVENT.BOOT_KEY  # key_up 时发送
            else:
                self._state = Button.DEBOUNCE
        elif self._state == Button.DEBOUNCE:
            self._key_ticks += delta
            if self._key_ticks > Button.DEBOUNCE_THRESHOLD:
                self._state = Button.SHORT_PRESS
                self.SHORT_PRESS_BUF = EVENT.BOOT_KEY  # key_up 时发送
        elif self._state == Button.SHORT_PRESS:
            self._key_ticks += delta
            if self._key_ticks > Button.LONG_PRESS_THRESHOLD:
                self._state = Button.LONG_PRESS
                self.SHORT_PRESS_BUF = None  # 检测到长按,将之前可能存在的短按buffer清除,以防发两个key event出去
                self.st.emit_event(EVENT.BOOT_KEY_LONG_PRESS)
        elif self._state == Button.LONG_PRESS:
            self._key_ticks += delta
            # 最迟 LONG_PRESS 发出信号,再以后就忽略,不需要处理。key_up时再退出状态机。
            pass
        else:
            pass
  • 这个类用于处理按钮事件,包括消抖和长按检测。

  • DEBOUNCE_THRESHOLD:定义了消抖的时间阈值。

  • LONG_PRESS_THRESHOLD:定义了长按的时间阈值。

  • IDLE、DEBOUNCE、SHORT_PRESS、LONG_PRESS:这些是按钮的内部状态。

  • __init__:初始化按钮状态和计时器。

  • reset:重置按钮状态和计时器。

  • key_up:当按钮释放时调用,如果有缓存的短按事件就触发它,然后重置按钮状态。

  • key_down:当按钮按下时调用,根据按下时间处理消抖和长按逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 未启用 state machine 的主循环,所以这里将需要循环执行的函数放在while True里
def loop_init():
    global lcd, img_init
    if state_machine.current_state != STATE.INIT:
        return

    img_init.draw_rectangle(0, 0, lcd.width(), lcd.height(), color=(0, 0, 255), fill=True, thickness=2)
    img_init.draw_string(65, 90, "Self Learning Demo", color=(255, 255, 255), scale=2)
    img_init.draw_string(5, 210, "short press:   next", color=(255, 255, 255), scale=1)
    img_init.draw_string(5, 225, "long press:      restart", color=(255, 255, 255), scale=1)
    lcd.display(img_init)


def loop_capture():
    global central_msg, bottom_msg
    img = sensor.snapshot()
    if central_msg:
        img.draw_rectangle(0, 90, lcd.width(), 22, color=(0, 0, 255), fill=True, thickness=2)
        img.draw_string(55, 90, central_msg, color=(255, 255, 255), scale=2)
    if bottom_msg:
        img.draw_string(5, 208, bottom_msg, color=(0, 0, 255), scale=1)
    lcd.display(img)


def loop_classify():
    global central_msg, bottom_msg
    img = sensor.snapshot()

    scores = []
    feature = kpu.run_with_output(img, get_feature=True)
    high = 0
    index = 0
    for j in range(len(features)):
        for f in features[j]:
            score = kpu.feature_compare(f, feature)
            if score > high:
                high = score
                index = j
    if high > THRESHOLD:
        bottom_msg = "class:{},score:{:2.1f}".format(index + 1, high)
    else:
        bottom_msg = None

    # display info
    if central_msg:
        print("central_msg:{}".format(central_msg))
        img.draw_rectangle(0, 90, lcd.width(), 22, color=(0, 255, 0), fill=True, thickness=2)
        img.draw_string(55, 90, central_msg, color=(255, 255, 255), scale=2)
    if bottom_msg:
        print("bottom_msg:{}".format(bottom_msg))
        img.draw_string(5, 208, bottom_msg, color=(0, 255, 0), scale=1)
    lcd.display(img)
  • loop_init:在STATE.INIT状态下,初始化LCD显示。

  • loop_capture:在训练状态下,捕获图像并在LCD上显示相关信息。

  • loop_classify:在分类状态下,对捕获的图像进行分类并在LCD上显示结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# main loop
features = []
THRESHOLD = 98.5    # 比对阈值,越大越严格
train_pic_cnt = 0   # 当前分类已训练图片数量
max_train_pic = 5   # 每个类别最多训练图片数量

central_msg = None  # 屏幕中间显示的信息
bottom_msg = None   # 屏幕底部显示的信息

fm.register(board_info.BOOT_KEY, fm.fpioa.GPIOHS0)
boot_gpio = GPIO(GPIO.GPIOHS0, GPIO.IN)

lcd.init()
sensor.reset(dual_buff=True)  # Reset and initialize the sensor. It will
# run automatically, call sensor.run(0) to stop
sensor.set_pixformat(sensor.RGB565)  # Set pixel format to RGB565 (or GRAYSCALE)
sensor.set_framesize(sensor.QVGA)  # Set frame size to QVGA (320x240)
sensor.set_windowing((224, 224))
sensor.set_vflip(1)
sensor.skip_frames(time=500)  # Wait for settings take effect.

kpu = KPU()
print("ready load model")
kpu.load_kmodel("/sd/KPU/self_learn_classifier/mb-0.25.kmodel")

state_machine = StateMachine(state_handlers, event_handlers, transitions)
state_machine.emit_event(EVENT.POWER_ON)

btn_ticks_prev = time.ticks_ms()
boot_btn = Button(state_machine)
while True:
    gc.collect()

    # 计算boot key被压下或弹起的时间,用于消抖和长按检测, 并且向state machine发送事件
    btn_ticks_cur = time.ticks_ms()
    delta = time.ticks_diff(btn_ticks_cur, btn_ticks_prev)
    btn_ticks_prev = btn_ticks_cur
    if boot_gpio.value() == 0:
        boot_btn.key_down(delta)
    else:
        boot_btn.key_up(delta)

    # 未启用 state machine 的主循环,所以这里将需要循环执行的函数放在while True里
    if state_machine.current_state == STATE.INIT:
        loop_init()

    elif state_machine.current_state == STATE.CLASSIFY:
        loop_classify()

    elif state_machine.current_state == STATE.TRAIN_CLASS_1 or state_machine.current_state == STATE.TRAIN_CLASS_2 \
            or state_machine.current_state == STATE.TRAIN_CLASS_3:
        loop_capture()
  • 初始化全局变量,如features、THRESHOLD、train_pic_cnt、max_train_pic、central_msg和bottom_msg。

  • 初始化LCD、传感器和KPU(Kendryte Processing Unit)。

  • 创建状态机实例并触发上电事件。

  • 在主循环中,收集垃圾、处理按钮事件、并根据当前状态调用相应的循环函数。