普通视图

发现新文章,点击刷新页面。
昨天以前Rehtt's Blog

【脚本】在jetson nano上使用摄像头分段录像并开启端口推流

作者 Rehtt
2026年1月8日 14:53

可设置录像保留天数、分段时间、分辨率、码率

#!/usr/bin/env python3
import sys
import os
import time
import datetime
import threading
import glob
import gi
import signal

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib, GObject

# ================= 配置 =================
CONFIG = {
    # 保存位置
    "save_dir": "/home/r/Videos/cctv",
    # 分段时长
    "segment_duration_ns": 60 * 1_000_000_000,  # 60s
    # 保留天数
    "retention_days": 7,
    # 剩余空间低于多少自动删除前面的录像
    "free_space_threshold": 50 * 1024 * 1024,  # 50MB
    # 端口
    "stream_port": 5000,
    # 摄像头地址
    "usb_device": "/dev/video0",
    # 分辨率
    "width": 1280,
    "height": 720,
    # 平均码率
    "bitrate": 2000000,
    # 峰值码率
    "peak-bitrate": 4000000,
}
# =======================================


class JetsonDVR:
    def __init__(self):
        # 初始化 GStreamer
        Gst.init(None)

        self.mainloop = GLib.MainLoop()
        os.makedirs(CONFIG["save_dir"], exist_ok=True)

        # 清理线程
        self.cleanup_thread = threading.Thread(target=self.cleanup_loop, daemon=True)
        self.cleanup_thread.start()

        self.pipeline = self.create_pipeline()

        # 监听总线消息
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.on_message)

        # 优雅退出的信号处理
        signal.signal(signal.SIGINT, self.handle_sigint)
        signal.signal(signal.SIGTERM, self.handle_sigint)

    def create_pipeline(self):
        print(f"[*] 初始化管道... 设备: {CONFIG['usb_device']}")

        source = (
            f"v4l2src device={CONFIG['usb_device']} ! "
            f"image/jpeg, width={CONFIG['width']}, height={CONFIG['height']}, framerate=30/1 ! "
            "jpegdec ! "
            "nvvidconv ! "
            "video/x-raw(memory:NVMM), format=I420"
        )

        #        encoder = (
        #            f" ! nvv4l2h264enc maxperf-enable=1 insert-sps-pps=1 bitrate={CONFIG['bitrate']} ! "
        #            "h264parse ! tee name=t"
        #        )

        # 使用 h265
        encoder = (
            " ! nvv4l2h265enc maxperf-enable=1 insert-sps-pps=1 "
            f"control-rate=1 bitrate={CONFIG['bitrate']} peak-bitrate={CONFIG['peak-bitrate']} ! "
            "video/x-h265,stream-format=byte-stream ! "
            "h265parse ! tee name=t"
        )

        record_branch = (
            f" t. ! queue ! "
            f"splitmuxsink name=sink "
            f"max-size-time={CONFIG['segment_duration_ns']} "
            f"muxer=mp4mux async-handling=true"
        )

        # Streaming Branch (推流):
        stream_branch = (
            f" t. ! queue leaky=1 ! "
            f"matroskamux streamable=true ! "
            f"tcpserversink host=0.0.0.0 port={CONFIG['stream_port']} "
            f"recover-policy=keyframe sync-method=latest-keyframe"
        )

        pipeline_str = source + encoder + record_branch + stream_branch

        print("-" * 60)
        print("GStreamer Pipeline String:")
        print(pipeline_str)
        print("-" * 60)

        try:
            pipeline = Gst.parse_launch(pipeline_str)
        except Exception as e:
            print("[!] 管道构建失败:", e)
            sys.exit(1)

        sink = pipeline.get_by_name("sink")
        if sink:
            sink.connect("format-location", self.on_format_location)
        else:
            print("[!] 警告: 找不到 splitmuxsink")

        return pipeline

    def on_format_location(self, splitmux, fragment_id):
        now = datetime.datetime.now()
        filename = now.strftime("%Y%m%d-%H%M%S.mp4")
        return os.path.join(CONFIG["save_dir"], filename)

    def cleanup_loop(self):
        retention_sec = CONFIG["retention_days"] * 24 * 3600
        while True:
            try:
                now = time.time()
                for f in glob.glob(os.path.join(CONFIG["save_dir"], "*.mp4")):
                    if os.path.isfile(f):
                        delete = False
                        if now - os.path.getmtime(f) > retention_sec:
                            delete = True
                        elif CONFIG["free_space_threshold"] > 0:
                            s_info = os.statvfs(CONFIG["save_dir"])
                            free_space = s_info.f_bsize * s_info.f_bavail
                            if free_space < CONFIG["free_space_threshold"]:
                                delete = True
                        if delete:
                            os.remove(f)
                            print(f"[-] 删除过期录像: {os.path.basename(f)}")
            except Exception as e:
                print("[!] 清理线程错误:", e)
            time.sleep(3600)

    def on_message(self, bus, message):
        mtype = message.type
        if mtype == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print("[!] Pipeline Error:", err)
            if debug:
                print("[!] Debug Info:", debug)
            self.quit()
        elif mtype == Gst.MessageType.EOS:
            print("[*] End of Stream")
            self.quit()

    def handle_sigint(self, sig, frame):
        print("\n[*] 接收到中断信号,正在停止...")
        self.quit()

    def quit(self):
        if self.mainloop.is_running():
            self.mainloop.quit()

    def run(self):
        print("[*] 启动 DVR 系统...")
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            self.mainloop.run()
        except KeyboardInterrupt:
            pass
        finally:
            self.stop_pipeline()

    def stop_pipeline(self):
        print("[*] 发送 EOS 并停止管道...")
        # 发送 EOS 以正确结束录像文件(防止 MP4 损坏)
        self.pipeline.send_event(Gst.Event.new_eos())
        # 给一点时间让 EOS 传播
        time.sleep(1)
        self.pipeline.set_state(Gst.State.NULL)
        print("[*] 已退出")


if __name__ == "__main__":
    dvr = JetsonDVR()
    dvr.run()

【杂项】ssh隐匿登录

作者 Rehtt
2025年8月13日 11:32

0x01 清除 Bash 命令历史记录

为防止命令行操作被记录,可在登录后立即禁用 Bash 的历史功能:

unset HISTORY HISTFILE HISTSAVE HISTLOG HISTZONE HISTORY HISTLOG
export HISTFILE=/dev/null
export HISTSIZE=0
export HISTFILESIZE=0

说明:

  • unset 命令用于清除可能影响历史记录的环境变量。
  • HISTFILE=/dev/null 将命令历史文件重定向至空设备,确保所有输入命令不会写入磁盘。
  • HISTSIZE=0HISTFILESIZE=0 分别禁用内存中的历史命令缓存和文件中的持久化存储。
建议:该配置可临时生效。若需永久禁用,可将相关设置写入用户的 shell 配置文件(如 .bashrc.profile),但应谨慎操作,避免影响正常运维。

0x02 隐藏登录 IP 地址

系统命令 last 读取 /var/log/lastlog 文件显示用户登录记录。此文件为二进制格式,无法直接编辑。以下是两种处理方式:

方法一:删除 lastlog 文件

sudo rm /var/log/lastlog
注意事项:
  • 删除该文件会清除所有用户的最近登录记录。
  • 部分系统在重启或执行某些认证操作时会自动重建该文件。
  • 此类操作可能触发日志完整性检测机制,引起管理员注意。
  • 不建议在生产环境中随意删除系统日志文件,尤其是未经审计许可的操作。

方法二:通过 SSH 端口转发隐藏真实 IP

使用本地端口转发建立代理连接:

ssh -N -L 2212:<目标主机IP>:22 root@<目标主机>

随后通过本地端口连接目标系统:

ssh -p 2212 root@127.0.0.1
原理:
  • 该命令将本地的 2212 端口映射到目标主机的 22 端口。
  • 登录时,系统记录的来源 IP 是 127.0.0.1,即本地回环地址,而非你的真实公网 IP。
  • lastwho 命令显示的将是隧道末端的地址,从而实现一定程度的身份隐藏。
补充说明:若目标系统启用了 PAM 审计模块或 auditd 等内核级审计工具,仍有可能记录原始连接信息。此方法仅适用于基础日志规避。

0x03 清理认证日志(auth.log)

认证相关的登录行为通常记录在 /var/log/auth.log(Debian/Ubuntu 系统)或 /var/log/secure(RHEL/CentOS 系统)中。可通过以下方式移除特定 IP 的记录。

删除包含指定 IP 的日志行:

sudo sed -i '/192\.168\.1\.1/d' /var/log/auth.log
注意:IP 地址中的点号需用反斜杠转义,否则会被当作正则表达式中的通配符处理。

替代方法:使用 grep 过滤并重写文件

sudo grep -v '192.168.1.100' /var/log/auth.log > /tmp/auth.log
sudo mv /tmp/auth.log /var/log/auth.log

对于使用 systemd 的系统:

还需清理 journald 的内存日志:

sudo journalctl --vacuum-time=1d

该命令会删除超过一天的日志条目,释放存储空间。

提醒

  • 修改认证日志属于高风险行为,容易被入侵检测系统(IDS)、安全信息与事件管理系统(SIEM)识别。
  • 现代安全架构普遍采用集中式日志收集(如 ELK、Splunk、rsyslog 转发等),即使本地日志被清除,远程日志服务器仍可能保留副本。
  • 任何日志篡改行为在未经授权的情况下均可能违反法律法规。

0x04 清除 wtmp 与 btmp 登录记录

Linux 系统中还有两个重要的二进制日志文件用于记录登录活动:

  • /var/log/wtmp:记录所有成功登录和登出的事件,last 命令读取此文件。
  • /var/log/btmp:记录失败的登录尝试,lastb 命令可查看其内容。

由于这两个文件为二进制格式,不能使用文本编辑器直接修改。

清空日志文件:

推荐使用 truncate 命令将文件大小置零:

sudo truncate -s 0 /var/log/wtmp
sudo truncate -s 0 /var/log/btmp

替代方式(谨慎使用):

也可删除后创建新的空文件,并恢复正确权限:

sudo rm /var/log/wtmp
sudo touch /var/log/wtmp
sudo chmod 644 /var/log/wtmp

btmp 文件操作类似。

提示truncate 是更安全的选择,避免因删除文件导致某些服务无法写入日志。

0x05 隐藏进程痕迹

运行扫描工具、反弹 shell 或提权程序时,进程名可能出现在 pstop 等监控命令输出中,暴露操作行为。

检测当前进程:

ps aux | grep python
top -b -n 1 | grep <your-tool>

隐藏进程名称:

利用 exec 系统调用覆盖当前进程名:

exec -a "sshd" /path/to/your-tool

执行后,该进程在 ps 输出中将显示为 sshd,具有一定迷惑性。

说明:这种方法仅修改进程的“显示名称”(argv[0]),不会改变实际的可执行文件路径或内存映像。高级监控工具(如 auditd、EDR 软件)仍可通过系统调用追踪其真实行为。

0x06 结语

本文介绍的技术主要用于理解系统日志机制和攻击者可能采取的规避手段,帮助系统管理员加强日志保护与入侵检测能力。需要强调的是:

  • 所有日志清理操作都应仅在授权测试环境或合法合规前提下进行。
  • 在真实生产系统中篡改或删除日志,不仅可能破坏系统稳定性,还可能触犯法律。
  • 强烈建议部署集中式日志审计、文件完整性监控(FIM)和实时告警机制,以防范未授权访问和痕迹清除行为。

了解攻击,是为了更好地防御。安全的本质,始终是攻防之间的持续博弈。

【杂项】常用正则匹配备忘(持续更新)

作者 Rehtt
2024年5月31日 16:20

找出没有包含“数值”的句子

^(?!数值).*&

解释:

  • ^ 表示行的开始。
  • (?=.*数值) 是正向前瞻,表示任意位置包含“数值”
  • (?!.*数值) 是负向前瞻,表示任意位置不包含“数值”
  • .* 匹配任意字符(除换行符外)
  • $ 表示行的结束
    (?!abc)[^abc]的区别:
    [^abc]匹配任何不是 abc 的单个字符
    (?!abc)匹配不是abc这个"词语"

【linux】目录

作者 Rehtt
2024年2月19日 10:09

0x00

发现自己以及周围的人关于linux的目录结构和作用还不怎么清楚,因此查了一些资料并记录下来

0x01

首先更正一点, /usr 目录很多人认为是user的意思,而usrUnix System Resource的缩写,是系统级目录

/usr/bin 为系统预装的一些可执行程序,所有用户都可用,随系统升级会改变
/usr/sbin 存放超级用户才能使用的应用程序
/usr/local为用户级目录,用户可以将自己编译的程序放在该目录的bin

/bin 存放所有用户皆可用的系统程序,系统启动或者系统修复时可用
/sbin 存放超级用户才能使用的系统程序

【Linux】使用Go开发PAM模块

作者 Rehtt
2023年10月20日 23:17

0x0 前言

PAM (Pluggable Authentication Modules) 系统的一部分,它是一个用于 Linux 系统认证的模块化框架。

如果给 ssh 登录添加 TOTP 验证一般会用到 google-authenticator 模块,这个模块就是实现和使用 PAM API 实现的。

PAM 不光可以在 ssh 登录时使用在其他大多数需要验证的时候也可以使用,甚至可以在自己程序中调用,总之 PAM 是一个非常好用的系统认证框架。

本篇文章将使用 Go 实现相关功能,而 PAM 的接口是用 C 语言实现的。因此将会使用 Go 的 CGO 进行编译,并且会尽量减少 C 语言的部分。

0x1 准备

因为 PAM 是 Linux 系统的认证框架,所以在 Linux 上开发最方便

首先安装依赖库:
Debian/Ubuntu

apt install libpam0g-dev

Centos

yum install pam-devel

安装后 include 文件在 /usr/include/security/ 目录下

0x2 开发 PAM 认证模块

首先在 Go 中引入 C 的库, Go 是在 import "C" 上方使用注释的方式引入 C 的内容

注意 import "C" 上方不能有空行

#cgo LDFLAGS: -lpamCGO 的命令,LDFLAGS: -lpam 是表示链接 PAM 静态库

typedef const char cchar_t; 是将 cchar_t 自定义为 const char 关键字,因为后面讲 C 转写成 GO 的时候 const xx 无法转写,因此需要提前定义一个关键字

#include "pam_prompt_wrapper.h" 是引入一个本地自定义的库,后面会后详解

/*
#cgo LDFLAGS: -lpam
#include <stdio.h>
#include <stdlib.h>
#include <security/pam_appl.h>
#include <security/pam_modules.h>
#include <security/pam_ext.h>
#include "pam_prompt_wrapper.h"
typedef const char cchar_t;
*/
import "C"

函数说明

pam_sm_authenticate是认证逻辑的接口函数,也是认证的核心,函数定义在pam_modules.h

接口函数是需要开发人员自己实现的函数,在函数内可以实现各种自定义的功能

函数签名为

int pam_sm_authenticate(pam_handle_t *pamh, int flags,
                        int argc, const char **argv);

使用 Go 实现,因为这个函数是要被外部调用的,因此需要在上方添加 //export 的注释,CGO 转写成 C 的时候会转成 extern 关键字

//export pam_sm_authenticate
func pam_sm_authenticate(pamh *C.pam_handle_t, flags C.int, argc C.int, argv **C.cchar_t) C.int {
    return C.PAM_SUCCESS
}

pam_sm_acct_mgmt是用户识别管理接口函数,用户判断用户否有权限,ssh登录中需要,函数定义也在pam_modules.h里,函数签名为

int pam_sm_acct_mgmt(pam_handle_t *pamh, int flags, int argc,
                                const char **argv);

使用 Go 实现

//export pam_sm_acct_mgmt
func pam_sm_acct_mgmt(pamh *C.pam_handle_t, flags C.int, argc C.int, argv **C.cchar_t) C.int {
    return C.PAM_SUCCESS
}

pam_prompt是认证过程中用于交互的函数,改函数直接调用,函数定义在 pam_ext.h 中,函数签名为

extern int PAM_FORMAT((printf, 4, 5)) PAM_NONNULL((1,4))
pam_prompt (pam_handle_t *pamh, int style, char **response,
        const char *fmt, ...);

注意 该函数中带有 ... 表示可以接受可变参数,而 CGO 中则不支持调用可变参数函数。因此需要使用 C 将该函数简单包装成一个固定参数的函数

int pam_prompt_wrapper (pam_handle_t *pamh, int style, char **response,const char *str) {
  return pam_prompt(pamh, style, response, "%s", str)
}

并且该函数要额外使用 C 文件存储,并在 Go 中引入

代码文件

pam_prompt_wrapper.h

#include <security/pam_appl.h>

int pam_prompt_wrapper(pam_handle_t *pamh, int style, char **response, const char *str);

pam_prompt_wrapper.c

#include <security/pam_appl.h>
#include <security/pam_ext.h>
#include <security/pam_modules.h>
#include <stdarg.h>

int pam_prompt_wrapper(pam_handle_t *pamh, int style, char **response, const char *str) {
    return pam_prompt(pamh, style, response, "%s", fmt);
}

main.go

package main

/*
#cgo LDFLAGS: -lpam
#include <stdio.h>
#include <stdlib.h>
#include <security/pam_appl.h>
#include <security/pam_modules.h>
#include <security/pam_ext.h>
#include "pam_prompt_wrapper.h"
typedef const char cchar_t;
*/
import "C"

import (
    "fmt"
    "os"
    "unsafe"
)

//export pam_sm_authenticate
func pam_sm_authenticate(pamh *C.pam_handle_t, flags C.int, argc C.int, argv **C.cchar_t) C.int {
    // 实现认证功能
  
    // 例如:
    // 认证时会显示 input >
    // 输入 123 后认证成功,否则失败
    a, _ := qa(pamh, false, "input >")
    if a == "123" {
        return C.PAM_SUCCESS
    }
  
    return C.PAM_AUTH_ERR
}

//export pam_sm_acct_mgmt
func pam_sm_acct_mgmt(pamh *C.pam_handle_t, flags C.int, argc C.int, argv **C.cchar_t) C.int {
      // 这里直接返回成功,也可以根据自己需求添加更多功能
      return C.PAM_SUCCESS
}

// 简单封装了交互函数
func qa(pamh *C.pam_handle_t, echo bool, query string) (string, C.int) {
    var f C.int
    f = C.PAM_PROMPT_ECHO_OFF
    if echo {
        f = C.PAM_PROMPT_ECHO_ON
    }
    input := C.CString("")
    defer C.free(unsafe.Pointer(input))
    prompt := C.CString(query)
    defer C.free(unsafe.Pointer(prompt))
    code := C.pam_prompt_wrapper(pamh, f, &input, prompt)
    return C.GoString(input), code
}

func main() {
}

编译

需要将 Go 编译成 C 的共享库

go build -buildmode=c-shared -o pam_example.so

0x3 使用

需要将模块放置到指定目录

sudo cp pam_example.so /lib/x86_64-linux-gnu/security

然后启动,这里展示在 ssh 登录验证中使用我们的模块

首先编辑 /etc/ssh/sshd_config ,修改 ChallengeResponseAuthenticationKbdInteractiveAuthentication 选项的值将其设为 yes

重启 sshd 服务 sudo service sshd restart

编辑 /etc/pam.d/sshd ,在文件添加 auth required pam_example.so

【Linux】使用cryptsetup进行luks加密

作者 Rehtt
2023年9月19日 17:38

0x00

LUKS (Linux Unified Key Setup)是 Linux 硬盘加密的标准。 通过提供标准的磁盘格式,它不仅可以促进发行版之间的兼容性,还可以提供对多个用户密码的安全管理。 与现有解决方案相比,LUKS 将所有必要的设置信息存储在分区信息首部中,使用户能够无缝传输或迁移其数据。

0x01 准备

sudo apt install cryptsetup

0x02 使用

加密磁盘分区

假设现在有磁盘分区 /dev/sdb2

  1. 格式化磁盘分区为luks
sudo cryptsetup luksFormat /dev/sdb2

输入大写YES
输入两次密码

这个时候会对磁盘分区进行全盘覆写,所以速度会比较慢

使用密钥文件(可选):

dd if=/dev/random of=keyfile bs=1K count=16  # 创建一个16k密钥文件,count可以大一点
sudo cryptsetup luksAddKey /dev/sdb2 keyfile # 使用密钥文件加密
  1. 映射加密分区

映射加密分区到jmsdb2,实际上是将加密分区映射到linux逻辑分区/dev/mapper/jmsdb2

sudo cryptsetup luksOpen /dev/sdb2 jmsdb2
  1. 格式化加密分区

之前在1中的格式化是将加密的物理分区格式化为luks,这个时候还需要对加密的逻辑分区格式化为常见的文件系统才可以使用

我这里示例选用ext4

sudo mkfs.ext4 /dev/mapper/jmsdb2
结构:
┌────────┐
│  luks  │
│┌──────┐│
││ ext4 ││
│└──────┘│
└────────┘
  1. 挂载分区

为了正常使用需要将逻辑分区挂载出来作为一个普通目录进行读写
我这挂在到/mnt/jmsdb2

# sudo mkdir /mnt/jmsdb2 # 如果有创建/mnt/jmsdb2可以跳过这一步
sudo mount /dev/mapper/jmsdb2 /mnt/jmsdb2

至此可以正常在 /mnt/jmsdb2 写数据了

  1. 卸载分区

如果不想使用了可以对分区进行卸载

sudo umont /mnt/jmsdb2  # 卸载逻辑分区
sudo cryptsetup luksClose jmsdb2  #关闭加密分区

创建加密文件

如果不想对整个分区加密,那可以在普通磁盘里创建一个加密文件,这样的好处是可以随意移动加密文件

dd if=/dev/zero of=/home/rehtt/cryptfile bs=1M count=1024

这里在/home/rehtt/下创建了一个名为cryptfile的文件,块大小(bs)为1M,共有(count)1024个块,也就是1024M

将上面的加密磁盘分区中的/dev/sdb2替换为/home/rehtt/cryptfile再跟着步骤即可

开机自动挂载

首先要使用密钥加密(上述第1步)

编辑/etc/crypttab

# <target name> <source device>    <key file>      <options>
jmsdb2 /dev/sdb2 /home/rehtt/keyfile luks

编辑/etc/fstab

/dev/mapper/jmsdb2 /mnt/jmsdb2 ext4  0 0 

【脚本】使用Python对socat简单的控制

作者 Rehtt
2023年9月12日 16:11

Use

python3 socat.py add 8080 192.168.137.1:8000

python3 socat.py del 8080 192.168.137.1:8000

python3 socat.py list

python3 socat.py -h

Code

import argparse
import os
import re
import subprocess


def get_dict(d, key):
    if type(d) is not dict:
        raise Exception('not dict')
    dd = d.get(key)
    if dd is None:
        return ''
    return dd


def socat_port():
    output = subprocess.check_output("ps aux | grep socat", shell=True, universal_newlines=True)

    out = []
    for line in output.splitlines():
        match = re.compile(r'socat (\w+)-LISTEN:(\d+)').search(line)
        if not match:
            continue
        data = {
            'listen_protocol': match.group(1).upper(),
            'listen_port': match.group(2)
        }

        match = re.compile(r'(\w+):([^:]+):(\d+)$').search(line)
        if not match:
            continue
        data['remote_protocol'] = match.group(1).upper()
        data['remote_host'] = match.group(2)
        data['remote_port'] = match.group(3)

        match = re.compile(r'(\d+)').search(line)
        if not match:
            continue
        data['listen_pid'] = match.group(1)

        match = re.compile(r',range=([^,]+),').search(line)
        if match:
            data['listen_range'] = match.group(1)
        match = re.compile(r',bind=([^,]+),').search(line)
        if match:
            data['listen_bind'] = match.group(1)
        out.append(data)

    return out


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-r', '--range', help='监听网段范围,例如:192.168.1.0/24')
    parser.add_argument('-b', '--bind', help='绑定监听地址,例如:192.168.1.1')
    parser.add_argument('-p', '--protocol', default='all', help='[all | tcp | udp]')
    parser.add_argument('positional', nargs='*',
                        help='<add | del | list> <本地端口> <远程地址>,例如:add 8000 192.168.1.2:8080')

    args = parser.parse_args()

    try:
        operation = args.positional[0]
        if operation != 'list':
            if len(args.positional) != 3:
                raise Exception()

            port = int(args.positional[1])
            raddr = args.positional[2]

            if not 0 < port < 1 << 16:
                print('端口不正确')
                exit(1)
    except:
        parser.print_usage()
        exit(1)

    if operation == 'add':
        command = 'socat {protocol}-LISTEN:{port},{option}fork,reuseaddr {protocol}:{raddr} &'
        option = ''
        if args.range is not None:
            option += 'range=%s,' % args.range
        if args.bind is not None:
            option += 'bind=%s,' % args.bind

        commands = []
        if args.protocol == 'all':
            commands.append({'protocol': 'TCP',
                             'command': command.format(protocol='TCP', port=port, option=option, raddr=raddr)})
            commands.append({'protocol': 'UDP',
                             'command': command.format(protocol='UDP', port=port, option=option, raddr=raddr)})
        elif args.protocol == 'tcp':
            commands.append({'protocol': 'TCP',
                             'command': command.format(protocol='TCP', port=port, option=option, raddr=raddr)})
        elif args.protocol == 'udp':
            commands.append({'protocol': 'UDP',
                             'command': command.format(protocol='UDP', port=port, option=option, raddr=raddr)})
        list = socat_port()
        for c in commands:
            bk = False
            for data in list:
                if data.get('listen_port') == str(c.get('port')) and data.get('listen_protocol') == c.get(
                        'protocol').upper():
                    bk = True
                    break
            if bk:
                print('{protocol} {port} 已被占用'.format(protocol=c.get('protocol'), port=c.get('port')))
                continue
            os.system(c.get('command'))
    elif operation == 'del':
        list = socat_port()
        command = 'kill %s'
        commands = []
        for data in list:
            if args.protocol != 'all' and data.get('listen_protocol') != args.protocol:
                continue
            if str(port) != data.get('listen_port'):
                continue
            commands.append(command % data.get('listen_pid'))

        for c in commands:
            os.system(c)
    elif operation == 'list':
        layout = '{listen_protocol}\t{listen_port}\t{listen_bind}\t{listen_range}\t{remote}'
        # print(layout.format(listen_protocol=''))
        for data in socat_port():
            print(layout.format(
                listen_protocol=get_dict(data, 'listen_protocol'),
                listen_port=get_dict(data, 'listen_port'),
                listen_bind=get_dict(data, 'listen_bind'),
                listen_range=get_dict(data, 'listen_range'),
                remote='%s:%s:%s' % (
                    get_dict(data, 'remote_protocol'), get_dict(data, 'remote_host'), get_dict(data, 'remote_port')),
            ))
    else:
        print('参数不正确')
        parser.print_usage()
        exit(1)

【脚本】ADB调用摄像头拍照并处理图片

作者 Rehtt
2023年6月29日 09:20

需安装:jpegoptim

#!/bin/bash

# 预处理目录
pre_dir=/mnt/sda/cam/pre
# 最终保存目录
dir=/mnt/sda/cam

mkdir $pre_dir 2&>1 > /dev/null
mkdir $dir 2&>1 > /dev/null

# 【可选】上报电池信息,用于自动化充电
# adb shell dumpsys battery | grep -E 'level|AC' |
#     sed ':a;N;s/\n/,/g;ta' - |
#     sed 's/AC powered/AC/g' - |
#     sed 's/ //g' - |
#     sed 's/,/,"/g' - |
#     sed 's/:/":/g' - |
#     sed 's/^|&/"/g' - |
#     sed 's/^/{"/g' - |
#     sed 's/$/}/g' - |
#     xargs -I value -o mosquitto_pub -t 'phone/battery/level' -h 192.168.31.110 -p 9883 -m 'value'

    
adb shell input keyevent 26
sleep 0.5
adb shell am start -a android.media.action.STILL_IMAGE_CAMERA
sleep 0.5
adb shell input tap 539 900
sleep 1
adb shell input keyevent 27
sleep 1
adb shell input keyevent 4
adb shell input keyevent 26

adb shell find /sdcard/DCIM/Camera -name "*.jpg" | xargs -o -I path adb pull path $pre_dir
adb shell rm /sdcard/DCIM/Camera/*

# 【可选】压缩图片,修改size可控制图片大小
find $pre_dir -name "*.jpg" | xargs jpegoptim --size=500k

mv $pre_dir/* $dir

<br/>

python3版本:

添加了对屏幕是否亮屏的检测

#!/bin/python3

import os
import re
import json
import time
adb_id = '192.168.31.129:5555'
pre_dir = '/mnt/sda/cam/pre/'
dir = '/mnt/sda/cam/'

try:
    os.mkdir(dir)
except:
    pass
try:
    os.mkdir(pre_dir)
except:
    pass


def cmd(command, display=True):
    if display:
        print(command)
    out = os.popen(command).read()
    if display:
        print(out)
    return out


def adb(shell, display=True):
    if not shell.startswith('adb'):
        shell = 'adb %s' % (shell)
    return cmd(shell.replace('adb', 'adb -s %s' % (adb_id)), display)


def is_screenState():
    return True if 'state=ON' in adb('shell dumpsys power | grep "Display Power: state="') else False


def open_screen():
    if not is_screenState():
        adb('shell input keyevent 26')


def close_screen():
    if is_screenState():
        adb('shell input keyevent 26')

# 发送电量数据


def send_power_status():
    info = adb('shell dumpsys battery', False)
    ac = True if re.compile(
        r'AC powered: ([^\n]+)').findall(info)[0] == 'true' else False
    level = int(re.compile(r'level: ([0-9]+)').findall(info)[0])
    j = json.dumps({"level": level, "AC": ac})
    os.popen(
        'mosquitto_pub -t \'phone/oppo/battery\' -h 192.168.31.110 -p 9883 -m \'%s\'' % (j))


send_power_status()
open_screen()
adb('shell am start -a android.media.action.STILL_IMAGE_CAMERA')
time.sleep(0.5)
adb('shell input tap 539 900')
time.sleep(1)
adb('shell input keyevent 27')
time.sleep(1)
adb('shell input keyevent 4')
close_screen()

for path in adb('adb shell find /sdcard/DCIM/Camera -name "*.jpg"').split('\n'):
    if path == '':
        continue
    adb('adb pull %s %s' % (path, pre_dir))
adb('adb shell rm /sdcard/DCIM/Camera/*')
for path in cmd('find %s -name "*.jpg"' % (pre_dir)).split('\n'):
    if path == '':
        continue
    cmd('jpegoptim --size=250k %s' % (path))
cmd('mv %s/* %s' % (pre_dir, dir))

【Golang】反射

作者 Rehtt
2023年6月14日 17:32

0x00

程序正常编译时变量会被转换为内存地址,而变量名、类型信息等不会被编译进可执行部分。而使用反射(golang中使用reflect包)则会将这些信息编译进可执行文件(所以有时为了程序安全而不会使用反射)。

0x01 实例

贴上一个经常用的xlsx文件转[]*struct代码

import (
    "errors"
    "github.com/tealeg/xlsx"
    "io"
    "net/http"
    "reflect"
)
// ParseXlsxToStruct
//
//     @Description: 解析xlsx到struct中
//     @param data    xlsx数据
//     @param out    目标struct
//     @param sheet 默认第一个sheet
//     @return error
//        ParseXlsxToStruct(data,&out)
func ParseXlsxToStruct(data []byte, out any, sheet ...string) error {
    value := reflect.ValueOf(out)
    if value.Type().Kind() != reflect.Ptr {
        return errors.New("必须以指针的方式传入")
    }
    value = value.Elem()
    if value.Type().Kind() != reflect.Slice {
        return errors.New("内部必须为struct切片")
    }
    if value.Len() != 0 {
        return errors.New("必须是空切片")
    }

    xfile, err := xlsx.OpenBinary(data)
    if err != nil {
        return err
    }
    if len(xfile.Sheets) == 0 {
        return errors.New("xlsx中没有sheet")
    }
    var sheets []*xlsx.Sheet
    if len(sheet) > 0 {
        sheets = make([]*xlsx.Sheet, 0, len(sheet))
        for _, v := range sheet {
            if s, ok := xfile.Sheet[v]; ok {
                sheets = append(sheets, s)
            }
        }
    } else {
        sheets = append(sheets, xfile.Sheets[0])
    }

    title := GetUniqueSheetsTitle(sheets)
    var tmp = make(map[string][]string, len(title))
    for _, s := range sheets {
        if len(s.Rows) == 0 {
            continue
        }
        title := GetSheetTitle(s)
        for i, row := range s.Rows {
            if i == 0 {
                continue
            }
            var dataMap = make(map[string]string)
            for j, cell := range row.Cells {
                if title[j] == "" {
                    continue
                }
                dataMap[title[j]] = cell.String()
            }
            for _, t := range title {
                if t == "" {
                    continue
                }
                tmp[t] = append(tmp[t], dataMap[t])
            }
        }
    }
    MapToStructByTag(tmp, out, "xlsx")
    return nil
}

func MapToStructByTag(data map[string][]string, in interface{}, tagKey string) {
    // 获取传入结构体切片的反射类型和值
    inValue := reflect.ValueOf(in).Elem()
    inType := inValue.Type().Elem()

    var l int
    for _, v := range data {
        if l < len(v) {
            l = len(v)
        }
    }
    //fmt.Println(inType)
    news := reflect.MakeSlice(inValue.Type(), l, l)
    defer func() {
        inValue.Set(news)
    }()

    for i := 0; i < news.Len(); i++ {
        structValue := news.Index(i)
        structType := inType
        // 获取结构体指针所指向的类型
        var prt bool
        var structValuePtr reflect.Value
        if structValue.Kind() == reflect.Ptr {
            structType = structType.Elem()
            structValuePtr = reflect.New(structType)
            structValue = structValuePtr.Elem()
            prt = true
        }

        // 遍历结构体字段
        for j := 0; j < structType.NumField(); j++ {
            fieldValue := structValue.Field(j)
            fieldType := structType.Field(j)
            // 检查字段的 tag 是否与 key 匹配
            tag := fieldType.Tag.Get(tagKey)
            if values, ok := data[tag]; ok {
                if i >= len(values) {
                    continue
                }
                // 检查字段是否可以被设置
                if fieldValue.CanSet() {
                    // 根据字段类型进行适当的转换并设置值
                    switch fieldType.Type.Kind() {
                    case reflect.String:
                        structValue.Field(j).SetString(values[i])
                    }
                }
            }

        }
        if prt {
            news.Index(i).Set(structValuePtr)
        }
    }
}

func ParseXlsxToStructByUrl(url string, out any, sheet ...string) error {
    resp, err := http.Get(url)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    data, _ := io.ReadAll(resp.Body)
    return ParseXlsxToStruct(data, out, sheet...)
}

func ParseStructToXlsx(a any, fn func(tag string, val any, cell *xlsx.Cell) error) (*xlsx.File, error) {
    val := reflect.ValueOf(a)
    if val.Type().Kind() != reflect.Slice {
        return nil, errors.New("必须传入[]*struct")
    }
    xFile := xlsx.NewFile()
    sheet, _ := xFile.AddSheet("A1")
    title := sheet.AddRow()
    for i := 0; i < val.Len(); i++ {
        elemVal := val.Index(i)
        for elemVal.Kind() == reflect.Ptr {
            elemVal = elemVal.Elem()
        }
        if elemVal.Kind() != reflect.Struct {
            return nil, errors.New("必须传入[]*struct")
        }
        row := sheet.AddRow()
        for j := 0; j < elemVal.NumField(); j++ {
            cell := row.AddCell()
            tag, ok := elemVal.Type().Field(j).Tag.Lookup("xlsx")
            if !ok {
                continue
            }
            if i == 0 {
                title.AddCell().SetString(tag)
            }
            v := elemVal.Field(j).Interface()
            if err := fn(tag, v, cell); err != nil {
                return nil, err
            }
        }
    }
    return xFile, nil
}

func GetUniqueSheetsTitle(ss []*xlsx.Sheet) []string {
    var tmp = make(map[string]struct{})
    var out []string
    for _, s := range ss {
        title := GetSheetTitle(s)
        for _, t := range title {
            if _, ok := tmp[t]; !ok && t != "" {
                out = append(out, t)
            }
        }
    }
    return out
}

func GetSheetTitle(s *xlsx.Sheet) []string {
    if len(s.Rows) == 0 {
        return nil
    }
    if row := s.Rows[0]; row != nil {
        var title = make([]string, len(row.Cells))
        for i, cell := range row.Cells {
            title[i] = cell.String()
        }
        return title
    }
    return nil
}

【疑难杂症】特权lxc下运行非特权docker报错

作者 Rehtt
2023年2月24日 08:53

0x0 问题

报错类似:

AppArmor enabled on system but the docker-default profile could not be loaded: running `/usr/sbin/apparmor_parser apparmor_parser

0x1 解决

在lxc的配置文件添加:

lxc.apparmor.profile = unconfined

添加后重启lxc

docker运行参数添加:

--security-opt apparmor=unconfined

如果是以docker-compose,添加:

services:
  your_service:
    security_opt:
      - apparmor=unconfined

【N5105】PVE+Docker+Jellyfin 核显直通

作者 Rehtt
2023年2月19日 10:41

0x00

当前我使用的PVE版本是7.3-3当前的内核是5.15.74,网上教程试了一圈发现都不行。最后发现问题是内核的原因。

0x01

首先升级宿主机内核

apt update
apt upgrade -y
apt install pve-kernel-5.19 -y

查看固件

cd /lib/firmware/i915 && ls ehl_guc*.bin &&  ls ehl_huc*.bin &&  ls icl_dmc*.bin

修改配置并重启

echo "options i915 enable_guc=3" >> /etc/modprobe.d/i915.conf
reboot

启动后检查

journalctl -b -o short-monotonic -k | egrep -i "i915|dmr|dmc|guc|huc"

输出的内容中有GuV HuC就代表驱动成功

[    4.841415] home kernel: Setting dangerous option enable_guc - tainting kernel
[    4.866046] home kernel: i915 0000:00:02.0: vgaarb: deactivate vga console
[    4.869642] home kernel: i915 0000:00:02.0: vgaarb: changed VGA decodes: olddecodes=io+mem,decodes=io+mem:owns=io+mem
[    4.871436] home kernel: i915 0000:00:02.0: [drm] Finished loading DMC firmware i915/icl_dmc_ver1_09.bin (v1.9)
[    4.876510] home kernel: mei_hdcp 0000:00:16.0-b638ab7e-94e2-4ea2-a552-d1c54b627f04: bound 0000:00:02.0 (ops i915_hdcp_component_ops [i915])
[    5.545198] home kernel: i915 0000:00:02.0: [drm] failed to retrieve link info, disabling eDP
[    5.552915] home kernel: i915 0000:00:02.0: [drm] GuC firmware i915/ehl_guc_70.1.1.bin version 70.1
[    5.552922] home kernel: i915 0000:00:02.0: [drm] HuC firmware i915/ehl_huc_9.0.0.bin version 9.0
[    5.569445] home kernel: i915 0000:00:02.0: [drm] HuC authenticated
[    5.569889] home kernel: i915 0000:00:02.0: [drm] GuC submission enabled
[    5.569891] home kernel: i915 0000:00:02.0: [drm] GuC SLPC disabled
[    5.572145] home kernel: [drm] Initialized i915 1.6.0 20201103 for 0000:00:02.0 on minor 0
[    5.575832] home kernel: sof-audio-pci-intel-icl 0000:00:1f.3: bound 0000:00:02.0 (ops i915_audio_component_bind_ops [i915])
[    5.576361] home kernel: i915 0000:00:02.0: [drm] Cannot find any crtc or sizes
[    5.576501] home kernel: i915 0000:00:02.0: [drm] Cannot find any crtc or sizes

0x02

docker映射设备/dev/dri:/dev/dri

jellyfin 硬解选择QSV

如果中间还加了一层LXC就在配置上添加

lxc.cgroup2.devices.allow: c 226:0 rwm
lxc.cgroup2.devices.allow: c 226:128 rwm
lxc.cgroup2.devices.allow: c 29:0 rwm
lxc.mount.entry: /dev/dri dev/dri none bind,optional,create=dir
lxc.apparmor.profile: unconfined

【Golang】调用syscall调用windows dll弹窗

作者 Rehtt
2022年9月22日 10:47
package main
import (
    "fmt"
    "syscall"
    "time"
    "unsafe"
)
 
const (
    MB_OK                = 0x00000000
    MB_OKCANCEL          = 0x00000001
    MB_ABORTRETRYIGNORE  = 0x00000002
    MB_YESNOCANCEL       = 0x00000003
    MB_YESNO             = 0x00000004
    MB_RETRYCANCEL       = 0x00000005
    MB_CANCELTRYCONTINUE = 0x00000006
    MB_ICONHAND          = 0x00000010
    MB_ICONQUESTION      = 0x00000020
    MB_ICONEXCLAMATION   = 0x00000030
    MB_ICONASTERISK      = 0x00000040
    MB_USERICON          = 0x00000080
    MB_ICONWARNING       = MB_ICONEXCLAMATION
    MB_ICONERROR         = MB_ICONHAND
    MB_ICONINFORMATION   = MB_ICONASTERISK
    MB_ICONSTOP          = MB_ICONHAND
 
    MB_DEFBUTTON1 = 0x00000000
    MB_DEFBUTTON2 = 0x00000100
    MB_DEFBUTTON3 = 0x00000200
    MB_DEFBUTTON4 = 0x00000300
)
 
func abort(funcname string, err syscall.Errno) {
    panic(funcname + " failed: " + err.Error())
}
 
var (
    user32, _     = syscall.LoadLibrary("user32.dll")
    messageBox, _ = syscall.GetProcAddress(user32, "MessageBoxW")
)
 
func IntPtr(n int) uintptr {
    return uintptr(n)
}
 
func StrPtr(s string) uintptr {
    return uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(s)))
}
 
func MessageBox(caption, text string, style uintptr) (result int) {
    ret, _, callErr := syscall.Syscall9(messageBox,
        4,
        0,
        StrPtr(text),
        StrPtr(caption),
        style,
        0, 0, 0, 0, 0)
    if callErr != 0 {
        abort("Call MessageBox", callErr)
    }
    result = int(ret)
    return
}
 
func ShowMessage2(title, text string) {
    user32 := syscall.NewLazyDLL("user32.dll")
    MessageBoxW := user32.NewProc("MessageBoxW")
    MessageBoxW.Call(IntPtr(0), StrPtr(text), StrPtr(title), IntPtr(0))
}
 
func main() {
    defer syscall.FreeLibrary(user32)
 
    num := MessageBox("Title1", "test1", MB_YESNOCANCEL)
    fmt.Printf("Get Retrun Value Before MessageBox %d\n", num)
    ShowMessage2("Title2", "test2")
    time.Sleep(3 * time.Second)
}
 
func init() {
    fmt.Print("start...\n")
}
❌
❌