webhook

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2023 License: MIT Imports: 2 Imported by: 1

README

webhook

webhook

✨ 你说得对,但是 webhook 是基于 weibo-webhook 改良的分布式博文收集终端 ✨

文档 · 下载

使用

下面是一个简易的提交脚本 ~(∠・ω< )

import asyncio
import json
from typing import Any, Tuple, Union

import httpx
from bilibili_api import Credential as _Credential
from bilibili_api import comment, live


class Credential(_Credential):
    def __init__(
        self,
        url: str,
        token: str | None = None,
        sessdata: str | None = None,
        bili_jct: str | None = None,
        dedeuserid: Union[str, None] = None,     
    ) -> None:
        super().__init__(sessdata=sessdata, bili_jct=bili_jct, dedeuserid=dedeuserid)
        self.session = httpx.AsyncClient(base_url=url)
        self.token = token if token is not None else ""

    async def __aenter__(self): return await self.check()

    async def __aexit__(self, type, value, trace): ...

    async def request(self, method, url, *args, **kwargs) -> Tuple[Any, Exception | None]:
        try:
            resp = await self.session.request(method, url, headers={"Authorization": self.token}, *args, **kwargs)
            assert resp.status_code == 200, "请求失败"
            result = resp.json()
            if result["code"] == 0:
                return result["data"], None
            return result, Exception(result["message"])
        except Exception as e:
            return None, e

    async def get(self, url, *args, **kwargs):
        return await self.request("GET", url, *args, **kwargs)
    
    async def post(self, url, *args, **kwargs):
        return await self.request("POST", url, *args, **kwargs)

    async def check(self):
        if self.dedeuserid is None:
            self.dedeuserid = await self.get_uid()
        if not await self.check_token():
            await self.get_token()
        return self

    async def check_token(self) -> bool:
        data, err = await self.get("/me")
        if err is None:
            return data["uid"] == str(self.dedeuserid)
        return False

    async def get_uid(self):
        info = await live.get_self_info(self)
        return str(info["uid"])

    async def get_token(self):
        data, err = await self.get("/token", params={"uid": self.dedeuserid})
        if err is not None:
            raise err
        resp = await self.send_comment(data["token"], data["oid"])
        if resp["success_action"] != 0:
            raise Exception("send comment failed")
        await self.register(data["auth"])

    async def send_comment(self, text: str, oid: int):
        return await comment.send_comment(text, oid, comment.CommentResourceType.DYNAMIC, credential=self)

    async def register(self, auth: str):
        self.token = ""
        for i in range(5):
            await asyncio.sleep(3)
            print(f"register {i}.")
            data, err = await self.get("/register", params={"Authorization": auth})
            if err is None:
                self.token = data
                break

    async def submit(self):
        data = {
            "mid":    "2",
            "time":   "1690442897",
            "text":   "你好李鑫",
            "source": "来自牛魔",
            "platform":    "bilibili",
            "uid":         "434334701",
            "create":      "1690442797",
            "name":        "七海Nana7mi",
            "face":        attachment("https://i2.hdslb.com/bfs/face/f261f5395f1f0082b106f7a23b9424a922b046bb.jpg"),
            "description": "大家好,测试这里",
            "follower":    "989643",
            "following":   "551",
            "attachments": [
                attachment("https://i1.hdslb.com/bfs/face/86faab4844dd2c45870fdafa8f2c9ce7be3e999f.jpg"),
                attachment("https://i2.hdslb.com/bfs/face/f261f5395f1f0082b106f7a23b9424a922b046bb.jpg"),
            ],
            "repost": {},
            "comments": [],
        }
        data, err = await self.post("/submit", data=data)
        print(data, err)


def dict2json(dic: dict = None, **kwargs) -> str:
    if dic is None:
        dic = {}
    dic.update(kwargs)
    return json.dumps(dic)


def attachment(url: str) -> str:
    return dict2json(url=url)


URL = "http://localhost:9000"
TOKEN = "********"
SESSDATA = ""
BILI_JET = ""


async def main():
    async with Credential(url=URL, token=TOKEN, sessdata=SESSDATA, bili_jct=BILI_JET) as cred:
        await cred.submit()


asyncio.run(main())

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Run

func Run(c *configs.Config)

启动!

func RunWithCycle added in v0.9.0

func RunWithCycle(c *configs.Config, cycle service.LifeCycle)

使用自定义 LifeCycle 启动

Types

type Config

type Config = configs.Config

type Github

type Github = configs.Github

type Path added in v0.4.1

type Path = configs.Path

Directories

Path Synopsis
cmd
webhook Module
db

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL