模仿telegram同步到logseq日志,按天区分,每天按分钟区分,效果如下
embed
bookmark
image
video
pdf
file
audio
code
equation
divider
breadcrumb
table_of_contents
link_to_page
table_row
column_list
column
table
heading_1
heading_2
heading_3
paragraph
bulleted_list_item
numbered_list_item
quote
to_do
toggle
template
callout
synced_block
telegram-api
import rich
import requests
from flask import Flask
from flask import request
from notion_api import insert
TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
app = Flask(__name__)
def get_file_path(file_id):
url = f"https://api.telegram.org/bot{TOKEN}/getFile?file_id={file_id}"
r = requests.get(url)
return r.json()["result"]["file_path"]
def get_file_url(file_path):
url = f"https://api.telegram.org/file/bot{TOKEN}/{file_path}"
return url
def send_message(chat_id, message):
url = f"https://api.telegram.org/bot{TOKEN}/sendMessage"
data = {"chat_id": chat_id, "text": message}
r = requests.post(url, json=data)
return r.status_code
@app.route("/notion_api", methods=["GET", "POST"])
def bot_api():
rich.print(request.json)
text = request.json["message"].get("text", "") or request.json["message"].get("caption", "")
chat_id = request.json["message"]["chat"]["id"]
if text == "/start":
send_message(chat_id, "绑定成功")
return ""
if request.json["message"].get("document"):
file_type = request.json["message"]["document"]["mime_type"]
file_id = request.json["message"]["document"]["file_id"]
file_url = get_file_url(get_file_path(file_id))
insert(text, file_type, file_url)
elif request.json["message"].get("photo"):
file_type = "image/jpeg"
file_id = request.json["message"]["photo"][-1]["file_id"]
file_url = get_file_url(get_file_path(file_id))
insert(text, file_type, file_url)
elif request.json["message"].get("video_note"):
file_type = "video/mp4"
file_id = request.json["message"]["video_note"]["file_id"]
file_url = get_file_url(get_file_path(file_id))
insert(text, file_type, file_url)
elif request.json["message"].get("voice"):
file_type = "audio/ogg"
file_id = request.json["message"]["voice"]["file_id"]
file_url = get_file_url(get_file_path(file_id))
insert(text, file_type, file_url)
else:
insert(text)
return ""
if __name__ == "__main__":
app.run(port=1236, threaded=True)
notion-api
import os
import time
import requests
from notion_client import Client
import datetime
from typing import Dict, Any
# Initialize Notion client
notion = Client(auth='xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
# Define database ID
database_id = "xxxxxxxxxxxxxxxxxxxxxxxxxx"
WEEKDAYS_DICT: Dict[str, str] = {
"Monday": "周一",
"Tuesday": "周二",
"Wednesday": "周三",
"Thursday": "周四",
"Friday": "周五",
"Saturday": "周六",
"Sunday": "周日",
}
def get_page(page_title: str) -> Dict[str, Any]:
"""
Retrieve a page from the database with the given title.
:param page_title: The title of the page to be retrieved.
:return: A dictionary containing information about the existing page.
"""
existing_page = notion.databases.query(
**{
"database_id": database_id,
"filter": {
"property": "title",
"title": {
"equals": page_title
}
}
}
)
return existing_page
def create_page(page_title: str, tag_value: str, tag_name: str = "tag") -> None:
"""
Create a new page in the database with the specified title and tag.
:param page_title: The title of the new page.
:param tag_value: The value of the tag to be added to the new page.
:param tag_name: The name of the tag property. Default is "tag".
"""
new_page = {
"title": {
"title": [{"text": {"content": page_title}}]
},
tag_name: {"multi_select": [{"name": tag_value}]},
}
new_page_content = [
{
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [
{
"type": "text",
"text": {
"content": page_title,
}
}
]
}
}
]
notion.pages.create(parent={"database_id": database_id}, properties=new_page, children=new_page_content)
def get_page_content(page_id: str) -> Dict[str, Any]:
"""
Retrieve the content of a page with the given page_id.
:param page_id: The ID of the page whose content is to be retrieved.
:return: A dictionary containing information about the page's content.
"""
content = notion.blocks.children.list(page_id)
return content
def append_page(block_id: str, text: str, file_type: str, file_url: str) -> None:
new_block = [
{
"object": "block",
"type": "bulleted_list_item",
"bulleted_list_item": {
"rich_text": [
{
"type": "text",
"text": {
"content": text,
}
}
]
}
}]
if file_url:
#file_content = requests.get(file_url).content
#uploaded_file = notion.files.upload(file_content)
if "image" in file_type:
new_block.append({
"object": "block",
"type": "image",
"image": {
"type": "external",
"external": {
"url": file_url
}
}
})
elif "video" in file_type:
new_block.append({
"object": "block",
"type": "video",
"video": {
"type": "external",
"external": {
"url": file_url
}
}
})
elif "audio" in file_type:
new_block.append({
"object": "block",
"type": "audio",
"audio": {
"type": "external",
"external": {
"url": file_url
}
}
})
else:
new_block.append({
"object": "block",
"type": "file",
"file": {
"type": "external",
"external": {
"url": file_url
}
}
})
notion.blocks.children.append(block_id, children=new_block, after=False)
def update_block(existing_page: Dict[str, Any], text: str, file_type: str, file_url: str) -> None:
page_id = existing_page["results"][0]["id"]
content = get_page_content(page_id)
block_id = content["results"][-1]["id"]
append_page(block_id, text, file_type, file_url)
def insert(text: str, file_type: str = "", file_url: str = "") -> None:
tag_name = "标签"
tag_value = "日志"
text = f"{time.ctime()[11:16]} - {text}" if text else time.ctime()[11:16]
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
weekday = WEEKDAYS_DICT[datetime.datetime.now().strftime("%A")]
page_title = f"{current_date} {weekday}"
existing_page = get_page(page_title)
if not existing_page["results"]:
create_page(page_title, tag_value, tag_name)
existing_page = get_page(page_title)
update_block(existing_page, text, file_type, file_url)
if __name__ == "__main__":
insert("你感觉怎么样")