本文篇幅较长,以代码+注释为主、文字内容为辅,主要是记录一下从 Hugo 迁移到 Typecho 的折腾过程。略过 VPS 安装 Typecho 的操作,就从整理本地的博客文章 Markdown 文件开始说起吧。

整个操作需要使用到以下几个 Python 库,在这里统一说明,后续代码中不再引入。

import os
import re
import time
import sqlite3
import pandas as pd

批量修改 Markdown 图片链接(可选)

我的 Markdown 文件和图片是原本是在同级目录下,为了方便后续博客图片的管理,这次顺便把图片也集中了起来。图片位置变更后,需要同步修改 Markdown 文件中图片链接。

几个关键的变量:

  • rule_img:匹配图片链接的正则表达式
  • md_path:原 Markdown 存放位置
  • out_path:修改后的 Markdown 存放位置
  • old_url:原图片链接
  • new_url:新的图片链接
md_path = r"E:\test\posts"
out_path = r"E:\test\posts_output"
files = [os.path.join(md_path, f) for f in os.listdir(md_path)]

def update_md(files: list) -> None:
    """批量修改 markdown 文件中的图片链接"""
    rule_header = re.compile("---.*?---", re.DOTALL)
    rule_img = re.compile("!\[.*?\]\((.*?)\)")

    for file in files:
        filename = os.path.split(file)[1]
        print(f"正在处理:{filename}")

        with open(file=file, mode='r', encoding='utf-8') as f:
            md = f.read()

            # 如过不需要修改 Front Matter 中的链接
            # 可以先把这部分提取出来,后续再拼接回去
            header = rule_header.findall(md)[0]
            # 提取内容
            content = md.replace(header, "")

            # 修改内容中的图片链接
            imgs = rule_img.findall(content)
            if imgs:
                for img in imgs:
                    old_url = f"]({img})"
                    new_url = f'](attachments/{img})'
                    content = content.replace(old_url, new_url)
  
            text = f"{header}{content}"

            with open(file=os.path.join(out_path, filename), mode="a", encoding="utf-8") as f2:
                f2.write(text)

    print("处理完成!")

1. 整理文章的属性和内容

从 Markdown 文件的 Front Matter 部分提取文章的分类、标签、发布日期、最后修改日期等信息。以我的一篇文章为例,Front Matter 部分的结构如下:

---
title: "开博第一篇"
slug: "my-first-post"
date: 2012-09-11T23:29:00+08:00
lastmod: 2017-08-13T20:27:31+08:00
keywords: ""
description: ""

categories: ["Coding"]
tags: ["Blog"]
featuredImage: ""
toc: false
---

通过正则表达式提取所有文章的要素和内容,构造出 Typecho 数据库中 content 表的数据,关键字段信息如下表。

键名类型解释
titlevarchar(200)内容标题
slugvarchar(200)内容缩略名
createdint(10)内容生成时的GMT unix时间戳
modifiedint(10)内容更改时的GMT unix时间戳
texttext内容文字
orderint(10)排序
authorIdint(10)内容所属用户id
typevarchar(16)内容类别
statusvarchar(16)内容状态
allowCommentchar(1)是否允许评论
allowPingchar(1)是否允许ping
allowFeedchar(1)允许出现在聚合中
parentint(10)父级
md_path = r"E:\content\posts"  # Markdown 文件夹
files = [os.path.join(md_path, f) for f in os.listdir(md_path)]

def get_content(files: list) -> tuple:
    """获取所有文章的要素"""

    rule_header = re.compile("---.*?---", re.DOTALL)
    rule_title = re.compile('title: "(.*?)"', re.DOTALL)

    posts = []
    categories = []
    tags = []

    for file in files:
        with open(file=file, mode='r', encoding='utf-8') as f:
            md = f.read()

            # 匹配 Front Matter
            header = rule_header.findall(md)[0]

            # 提取 title, slug, created, modified, category, tag
            title = rule_title.findall(header)[0]
            slug = re.findall('slug: "(.*?)"', header)[0]
            created = int(time.mktime(time.strptime(re.findall('date: (.*?)\+08:00', header)[0].replace("T", " "), "%Y-%m-%d %H:%M:%S")))
            modified = int(time.mktime(time.strptime(re.findall('lastmod: (.*?)\+08:00', header)[0].replace("T", " "), "%Y-%m-%d %H:%M:%S")))
            category = re.findall('categories: \["(.*?)"\]', header)

            tag = []
            if re.findall('tags: \[(.*?)\]', header):
                tag = re.findall('tags: \[(.*?)\]', header)[0].replace('"', "").replace(" ", "").split(",")

            # 提取 content, 并去除开头的空格
            content = md.replace(header, "")
            content = re.sub("^\s*", "", content)

            # 查找 Markdown 中是否有一级标题, 如有则去除
            # 我习惯用一级标题写文章标题, Typecho 的 content 中不需要
            rule_content_title = re.compile(f"#\s{title}\s*")
            waitforclear = rule_content_title.findall(content)
            if waitforclear:
                content = content.replace(waitforclear[0], "")
  
            # Typecho 的 content 中 Markdown 需要以 <!--markdown--> 开头
            text = f"<!--markdown-->{content}"
  
        # 区分内容类型: post 或 page
        if title in ["关于"]:
            post_type = "page"
        else:
            post_type = "post"

        post = title, slug, created, modified, text, 0, 1, post_type, 'publish', '1', '1', '1', 0
        posts.append(post)
        categories.append(category)
        tags.append(tag)
  
    print("文章要素获取完成!")

    return posts, categories, tags

2. 将文章的属性和内容写入数据库

将第 1 步中 get_content 函数返回值中的 posts 和 Typecho 数据库传入下面的 insert_content 函数,将文章内容批量写入 contents 表。

def insert_content(posts: list, db: str) -> None:
    """数据库写入文章内容"""
    with sqlite3.connect(db) as conn:
        cur = conn.cursor()
        sql_contents = """insert into typecho_contents
            (title, slug, created, modified, text, "order", authorId, type, status, allowComment, allowPing, allowFeed, parent)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"""
        cur.executemany(sql_contents, posts)
    print("文章写入完成!")

到这里 contents 表的操作基本结束,还有一个 commentsNum 字段,记录文章的评论数,需要等后面写入评论数据后再来更新。

再将第 1 步中 get_content 函数返回值中的 tags 和 Typecho 数据库传入下面的 insert_metas 函数,将文章的分类和标签批量写入 metas 表。

因为我的分类比较少,就偷个懒直接手工构造分类的数据了,如果你的分类比较多,可以参照 tags 的构造方法,使用第 1 步中 get_content 函数返回值中的 categories 进行构造。

def insert_metas(tags: list, db: str) -> None:
    """数据库写入文章分类和标签"""
    # 字段顺序: name, slug, type, description, count, order, parent
    metas = [
        ("生活", "life", "category", "记录生活中的点滴,偶尔写点不痛不痒的文字", 0,0,0),
        ("折腾", "coding", "category", "折腾博客的记录、各种新产品、软件的体验评测", 0,1,0),
        ("话题", "topic", "category", "聊聊时事、电影、音乐,游戏,阐述个人观点", 0,2,0),
        ("行摄", "travel", "category", "分享旅行游记攻略,一路风景以及个人摄影作品", 0,3,0)
    ]

    ts = set([t for tag in tags for t in tag])
    for t in ts:
        metas.append((t, t.lower(), "tag", "", 0, 0, 0))

    sql = """insert into typecho_metas (name, slug, type, description, count, "order", parent) values(?,?,?,?,?,?,?)"""
    with sqlite3.connect(db) as conn:
        cur = conn.cursor()
        cur.executemany(sql, metas)

    print("分类、标签写入完成!")

到这里 metas 表的操作也大体完成了,还有一个 count 字段,记录的是按分类、标签统计的文章数量,留到最后跟评论数一起更新。

3. 建立文章与分类、标签之间的关系

构建文章、分类、标签三个字典,通过循环 postscatstags 三个列表,匹配出文章与分类、文章与标签的关系,即 (cid, mid),再写入 relationships 表。

  • 文章的 { slug: cid }
  • 分类的 { slug: mid }
  • 标签的 { slug: mid }
def insert_relationships(posts, cats, tags, db):
    """文章id关联分类和标签id, 并写入数据库"""

    sql_cid = "select cid, slug from typecho_contents where type='post';"
    sql_cat = "select mid, slug from typecho_metas where type='category';"
    sql_tag = "select mid, name from typecho_metas where type='tag';"

    with sqlite3.connect(db) as conn:
        dict_cids = pd.read_sql_query(sql_cid, con=conn, index_col="slug").to_dict()['cid']
        dict_cats =pd.read_sql_query(sql_cat, con=conn, index_col="slug").to_dict()['mid']
        dict_tags =pd.read_sql_query(sql_tag, con=conn, index_col="name").to_dict()['mid']

        result = []
        for ps, cs, ts in zip(posts, cats, tags):
            if ps[7] == "post":  # post_type 为 post, 排除 page
                cid = dict_cids[ps[1]]  # 根据 slug 取 cid
                for c in cs:
                    mid = dict_cats[c.lower()]  # 根据 分类的 slug 取 mid
                    result.append((cid, mid))
                if ts:  # 如果有标签
                    for t in ts:
                        mid = dict_tags[t]  # 根据 标签的 slug 取 mid
                        result.append((cid, mid))

        relationships = pd.DataFrame(result, columns=['cid', 'mid'])
        relationships.to_sql("typecho_relationships", con=conn, if_exists="append", index=False)
    print("文章、分类、标签关联关系已更新!")

4. 整理评论数据并写入数据库

之前 Hugo 搭配的是 Artalk 评论系统,选择的也是 SQLite 数据库,所以处理评论数据就简单多了。将 Artalk 中 comments 表的 page_key 字段简单处理一下就是 Typecho 中 contents 表中的 slug,再通过这个 slug 关联出文章的 cid,即可整理出评论数据。

这里有个我踩过的坑,通过上面的方法整理出的评论数据遗漏了评论的层级关系!

在 Typecho 中,评论的 coid 关联文章 cid 得到当前评论属于哪篇文章,评论的 parent 记录当前评论的父级 coid。当我们在 Typecho 中写入文章和评论数据后,文章和评论的 ID 已经与 Artalk 中的不一样了,原 Artalk 中评论的父级 ID 已经不可用了。

我这里采取了一个间接的处理方法,因为我的评论数据中每一条评论的创建时间都不重复,所以通过评论时间来确定评论的父级 ID。即在写入评论数据时,parent 字段先写入父级评论的创建时间,后续再通过这个创建时间关联出父级评论的 coid 并更新 parent

def insert_comments(db_artalk, db):
    """写入评论数据"""

    # 获取文章评论对照关系
    sql_cid = "select cid, slug from typecho_contents;"
    with sqlite3.connect(db) as conn:    
        dict_cids = pd.read_sql_query(sql_cid, con=conn, index_col="slug").to_dict()['cid']

    # 生成待写入的评论数据
    sql_comment = """
        SELECT
            STRFTIME('%s', SUBSTR(t1.created_at, 1, 19)) as 'created',
            t1.content as 'text',
            REPLACE(REPLACE(REPLACE(t1.page_key, 'http://notesth.com/', ''), 'posts/', ''), '/', '')  as 'slug',
            t1.ua as 'agent',
            t1.ip as 'ip',
            t3.name as 'author',
            (CASE WHEN t3.name='CrazyM' THEN 1 ELSE 0 END) as 'authorId',
            1 AS 'ownerId',
            'comment' as 'type',
            'approved' as 'status',
            t3.email as 'mail',
            t3.link as 'url',
            STRFTIME('%s', SUBSTR(t2.created_at, 1, 19)) as 'parent'
        from comments t1
        LEFT JOIN comments t2
        ON t1.rid=t2.id
        LEFT JOIN
            (SELECT id, name, email, link FROM users) t3
        ON t1.user_id=t3.id;
    """

    with sqlite3.connect(db_artalk) as conn:
        comments = pd.read_sql_query(sql_comment, con=conn)
        comments['cid'] = comments["slug"].map(dict_cids)
        comments.drop(columns=['slug'], inplace=True)

    # 字段: 'cid', 'created', 'author', 'authorId', 'ownerId', 'mail', 'url', 'ip', 'agent', 'text', 'type', 'status', 'parent'

    # 写入评论数据
    with sqlite3.connect(db) as conn:
        comments.to_sql(name="typecho_comments", con=conn, index=False, if_exists="append")

    print("评论数据写入完成!")

到这里,评论数据已经处理并写入完成,这里也留下了一个待后续更新的字段 parent

5. 更新评论数、文章数、评论层级

文章、分类、标签、评论都已经写入完毕,下面就是一些收尾的工作。更新一下每篇文章的评论数,每个分类、标签下的文章数,以及评论的父级信息。

def update_count(db):
    """更新文章计数, 分类计数, 标签计数,评论父级信息"""
    with sqlite3.connect(db) as conn:
        sql_update_comment_count = """UPDATE typecho_contents
            SET
            commentsNum= IFNULL((
            SELECT num FROM (SELECT cid, COUNT(coid) as 'num' FROM typecho_comments GROUP BY cid) t
            WHERE typecho_contents.cid=t.cid), 0);
        """

        sql_update_metas_count = """UPDATE typecho_metas
            SET
            count=IFNULL((
            SELECT num FROM (SELECT mid, COUNT(cid) as 'num' FROM typecho_relationships GROUP BY mid) t
            WHERE typecho_metas.mid = t.mid), 0);
        """

        sql_comment_parent = "SELECT coid, created FROM typecho_comments;"
        sql_update_comments_parent = f"UPDATE typecho_comments SET parent=? where parent=?;"
        sql_update_comments_parent_fillna = f"UPDATE typecho_comments SET parent=0 where parent is null;"

        # 获取评论层级对照关系
        list_coids = pd.read_sql_query(sql_comment_parent, con=conn).values.tolist()
        cur = conn.cursor()

        # 更新typecho_contents commentsNum
        cur.execute(sql_update_comment_count)
        print("评论数更新完成")

        # 更新typecho_metas count
        cur.execute(sql_update_metas_count)
        print("分类、标签文章数更新完成")

        # 更新typecho_comments parent
        cur.executemany(sql_update_comments_parent, list_coids)
        cur.execute(sql_update_comments_parent_fillna)

        print("评论层级关系更新完成")

完整代码:GitHub

标签: Blog, Hugo, Typecho

仅有一条评论

  1. 怎么又搞到Typecho

添加新评论