details of cloud
# app.py — "Subhash" Blogspot-like publisher (single-file Flask)
# Run locally:
# pip install flask flask_sqlalchemy werkzeug
# python app.py
# Admin:
# username=subhash password=admin123 (override via env; change in production)
import os, re, unicodedata, datetime, json, secrets
from typing import Optional
from flask import (
Flask, request, redirect, url_for, render_template_string,
flash, session, abort, jsonify
)
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
from sqlalchemy import func
# --------- Config ---------
SITE_NAME = os.environ.get("SITE_NAME", "Subhash")
AUTHOR_NAME = os.environ.get("AUTHOR_NAME", "Subhash")
CANONICAL_BASE = os.environ.get("CANONICAL_BASE") # e.g., https://blog.example.com
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
DB_URL = os.environ.get("DATABASE_URL")
if DB_URL and DB_URL.startswith("postgres://"):
DB_URL = DB_URL.replace("postgres://", "postgresql://", 1)
if not DB_URL:
DB_URL = "sqlite:///" + os.path.join(BASE_DIR, "blog.db")
app = Flask(__name__)
app.config.update(
SECRET_KEY=os.environ.get("SECRET_KEY", secrets.token_hex(32)),
SQLALCHEMY_DATABASE_URI=DB_URL,
SQLALCHEMY_TRACK_MODIFICATIONS=False,
PREFERRED_URL_SCHEME=os.environ.get("PREFERRED_URL_SCHEME", "https"),
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
SESSION_COOKIE_SECURE=bool(os.environ.get('SESSION_COOKIE_SECURE', '')), # set to 1 behind HTTPS
)
db = SQLAlchemy(app)
# --------- Models ---------
post_tag = db.Table(
"post_tag",
db.metadata,
db.Column("post_id", db.Integer, db.ForeignKey("post.id", ondelete="CASCADE"), primary_key=True),
db.Column("tag_id", db.Integer, db.ForeignKey("tag.id", ondelete="CASCADE"), primary_key=True),
)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
is_admin = db.Column(db.Boolean, default=True)
def set_password(self, pw): self.password_hash = generate_password_hash(pw)
def check_password(self, pw): return check_password_hash(self.password_hash, pw)
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(220), unique=True, nullable=False, index=True)
content = db.Column(db.Text, nullable=False)
excerpt = db.Column(db.Text, nullable=True)
cover_image = db.Column(db.String(400), nullable=True)
is_published = db.Column(db.Boolean, default=False, index=True)
published_at = db.Column(db.DateTime, nullable=True, index=True)
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
views = db.Column(db.Integer, default=0)
tags = db.relationship("Tag", secondary=post_tag, backref="posts")
class Tag(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True, nullable=False)
slug = db.Column(db.String(80), unique=True, nullable=False, index=True)
class Page(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
slug = db.Column(db.String(220), unique=True, nullable=False, index=True)
content = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)
class Comment(db.Model):
id = db.Column(db.Integer, primary_key=True)
post_id = db.Column(db.Integer, db.ForeignKey("post.id", ondelete="CASCADE"), nullable=False, index=True)
name = db.Column(db.String(100), nullable=False)
email = db.Column(db.String(200), nullable=True)
website = db.Column(db.String(255), nullable=True)
body = db.Column(db.Text, nullable=False)
approved = db.Column(db.Boolean, default=True) # simple default; consider moderation in production
created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow)
# --------- Init / Seed ---------
def init_db_and_seed():
db.create_all()
admin_user = os.environ.get("ADMIN_USERNAME", "subhash")
admin_pass = os.environ.get("ADMIN_PASSWORD", "admin123")
if not User.query.filter_by(username=admin_user).first():
u = User(username=admin_user, is_admin=True)
u.set_password(admin_pass)
db.session.add(u)
db.session.commit()
# seed a default About page if none
if not Page.query.filter_by(slug="about").first():
about = Page(title="About", slug="about", content=f"{% if title %}{{ title }} · {% endif %}{{ site_name }}
{% if meta_description %}{% endif %}
{% if canonical_url %}{% endif %}
{% if og %}
{% if og.description %}{% endif %}
{% if og.image %}{% endif %}
{% if og.description %}{% endif %}
{% if og.image %}{% endif %}
{% endif %}
{% if blog_jsonld %}{% endif %}
{% if page_jsonld %}{% endif %}
{% with messages = get_flashed_messages() %}
{% if messages %}{% for m in messages %}
"""
# JSON-LD for Blog + SearchAction
blog_jsonld = {
"@context":"https://schema.org",
"@type":"Blog",
"name": SITE_NAME,
"url": CANONICAL_BASE or request.url_root.rstrip("/"),
"publisher": {"@type":"Organization", "name": AUTHOR_NAME},
"potentialAction":{
"@type":"SearchAction",
"target": (canonical(url_for('search')) + "?q={search_term_string}") if CANONICAL_BASE else (request.url_root.rstrip("/") + url_for('search') + "?q={search_term_string}"),
"query-input":"required name=search_term_string"
}
}
ctx.setdefault("blog_jsonld", json.dumps(blog_jsonld))
ctx.setdefault("site_name", SITE_NAME)
html = render_template_string(base, content=content, **ctx)
return html
# --------- Security headers ---------
@app.after_request
def security_headers(resp):
resp.headers["X-Content-Type-Options"] = "nosniff"
resp.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
resp.headers["X-Frame-Options"] = "SAMEORIGIN"
# Inline CSS used; tighten CSP if extracting assets
resp.headers["Content-Security-Policy"] = "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self' 'unsafe-inline'; img-src 'self' data:"
return resp
# --------- Public Routes ---------
@app.route("/")
def home():
page = max(int(request.args.get("page", 1) or 1), 1)
per_page = 8
q = Post.query.filter_by(is_published=True).order_by(Post.published_at.desc())
total = q.count()
posts = q.offset((page-1)*per_page).limit(per_page).all()
has_next = (page * per_page) < total
items = []
for p in posts:
items.append({
"title": p.title,
"url": post_url(p),
"date": p.published_at.date().isoformat() if p.published_at else "",
"excerpt": post_meta_description(p),
"cover": p.cover_image,
"tags": [t for t in p.tags],
})
content = """
{% for it in items %}
{% if it.cover %}
{% endif %}
{% endfor %}
"""
desc = f"{SITE_NAME} — posts by {AUTHOR_NAME}."
return render_page(content, items=items, page=page, has_next=has_next,
title=None, meta_description=desc,
og={"title": SITE_NAME, "description": desc, "url": canonical(url_for('home'))})
@app.route("///.html", methods=["GET","POST"])
@csrf_protect
def post_detail(year, month, slug):
p = Post.query.filter_by(slug=slug).first_or_404()
# Public only if published and month/year match
if not p.is_published or not p.published_at or p.published_at.year != year or p.published_at.month != month:
# allow preview if logged in
if "user_id" not in session:
abort(404)
# comments (simple)
if request.method == "POST":
if not p.is_published:
abort(400)
name = (request.form.get("name") or "").strip()
body = (request.form.get("body") or "").strip()
email = (request.form.get("email") or "").strip() or None
website = (request.form.get("website") or "").strip() or None
hp = (request.form.get("hp") or "").strip()
if hp: # honeypot
flash("Something went wrong.")
return redirect(post_url(p))
if not name or not body:
flash("Name and comment are required.")
return redirect(post_url(p))
c = Comment(post_id=p.id, name=name, email=email, website=website, body=body, approved=True)
db.session.add(c); db.session.commit()
flash("Comment posted.")
return redirect(post_url(p))
if p.is_published:
p.views += 1
db.session.commit()
title = p.title
description = post_meta_description(p)
canonical_url = canonical(post_url(p))
published_iso = (p.published_at or p.created_at).isoformat()
tags = p.tags
# JSON-LD BlogPosting
jsonld = {
"@context": "https://schema.org",
"@type": "BlogPosting",
"headline": p.title,
"datePublished": (p.published_at or p.created_at).strftime("%Y-%m-%d"),
"dateModified": p.updated_at.strftime("%Y-%m-%d"),
"author": {"@type":"Person","name": AUTHOR_NAME},
"publisher": {"@type":"Organization","name": AUTHOR_NAME},
"image": p.cover_image,
"mainEntityOfPage": {"@type":"WebPage","@id": canonical_url},
"url": canonical_url,
"keywords": ", ".join([t.name for t in tags]) if tags else None,
"description": description
}
comments = Comment.query.filter_by(post_id=p.id, approved=True).order_by(Comment.created_at.asc()).all()
content = """
{% if p.cover_image %}
{% endif %}
"""
og = {"title": title, "description": description, "url": canonical_url, "image": p.cover_image, "type":"article"}
return render_page(content, p=p, author=AUTHOR_NAME, comments=comments, csrf=session.get("csrf_token"),
title=title, meta_description=description, canonical_url=canonical_url,
og=og, page_jsonld=json.dumps(jsonld))
@app.route("/tag/")
def tag_page(tag_slug):
tag = Tag.query.filter_by(slug=tag_slug).first_or_404()
posts = (Post.query
.join(post_tag).join(Tag)
.filter(Tag.slug == tag_slug, Post.is_published == True)
.order_by(Post.published_at.desc())
.all())
content = """
"""
return render_page(content, tag=tag, posts=posts, post_url=post_url,
title=f"Tag: {tag.name}",
meta_description=f"Posts tagged {tag.name}",
og={"title": f"Tag: {tag.name} · {SITE_NAME}", "description": f"Posts tagged {tag.name}", "url": canonical(url_for('tag_page', tag_slug=tag_slug))})
@app.route("/tags")
def tags_page():
tags = Tag.query.order_by(Tag.name.asc()).all()
content = """
"""
return render_page(content, tags=tags, title="Tags", meta_description=f"All tags on {SITE_NAME}")
@app.route("/p/.html")
def page_detail(slug):
page = Page.query.filter_by(slug=slug).first_or_404()
desc = re.sub("<[^<]+?>", "", page.content or "")[:160]
content = """
"""
return render_page(content, page=page, title=page.title, meta_description=desc,
og={"title": page.title, "description": desc, "url": canonical(url_for('page_detail', slug=slug))})
@app.route("/search")
def search():
q = (request.args.get("q") or "").strip()
if not q:
return redirect(url_for("home"))
like = f"%{q}%"
posts = (Post.query
.filter(Post.is_published == True)
.filter((Post.title.ilike(like)) | (Post.content.ilike(like)))
.order_by(Post.published_at.desc()).all())
content = """
"""
return render_page(content, q=q, posts=posts, post_url=post_url, title=f"Search: {q}",
meta_description=f"Search results for {q}",
og={"title": f"Search: {q} · {SITE_NAME}", "description": f"Search results for {q}", "url": canonical(url_for('search'))})
@app.route("/feed")
def feed():
posts = Post.query.filter_by(is_published=True).order_by(Post.published_at.desc()).limit(20).all()
items = []
for p in posts:
link = canonical(post_url(p))
pub = (p.published_at or p.created_at).strftime("%a, %d %b %Y %H:%M:%S +0000")
desc = post_meta_description(p)
items.append(f"{escape_xml(p.title)} {link}{link} {pub} {escape_xml(desc)} ")
rss = f"""
{escape_xml(SITE_NAME)}
{canonical(url_for('home'))}
{escape_xml(SITE_NAME)} RSS Feed
{''.join(items)}
"""
return app.response_class(rss, mimetype="application/rss+xml")
def escape_xml(s: str) -> str:
return (s or "").replace("&","&").replace("<","<").replace(">",">").replace('"',""")
@app.route("/sitemap.xml")
def sitemap():
urls = []
# Home
urls.append({"loc": canonical(url_for("home", _external=False)), "lastmod": datetime.datetime.utcnow().date().isoformat()})
# Pages
for pg in Page.query.order_by(Page.updated_at.desc()).all():
urls.append({"loc": canonical(url_for("page_detail", slug=pg.slug, _external=False)), "lastmod": pg.updated_at.date().isoformat()})
# Posts
for p in Post.query.filter_by(is_published=True).order_by(Post.updated_at.desc()).all():
urls.append({"loc": canonical(post_url(p)), "lastmod": (p.updated_at or p.published_at or p.created_at).date().isoformat()})
xml_items = "\n".join([f' {u["loc"]} {f"{u["lastmod"]} " if u.get("lastmod") else ""} ' for u in urls])
xml = f'\n\n{xml_items}\n '
return app.response_class(xml, mimetype="application/xml")
@app.route("/robots.txt")
def robots():
lines = [
"User-agent: *",
"Allow: /",
f"Sitemap: {canonical(url_for('sitemap'))}"
]
return app.response_class("\n".join(lines), mimetype="text/plain")
@app.route("/healthz")
def healthz():
try:
db.session.execute("SELECT 1")
return jsonify(status="ok"), 200
except Exception as e:
return jsonify(status="db_error", error=str(e)), 500
# --------- Admin ---------
@app.route("/admin")
@login_required
def admin_dashboard():
posts = Post.query.order_by(Post.updated_at.desc()).limit(12).all()
pages = Page.query.order_by(Page.updated_at.desc()).all()
content = """
"""
return render_page(content, posts=posts, pages=pages, post_url=post_url, title="Admin")
@app.route("/admin/login", methods=["GET", "POST"])
@csrf_protect
def admin_login():
if "user_id" in session:
return redirect(url_for("admin_dashboard"))
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
u = User.query.filter_by(username=username).first()
if u and u.check_password(password):
session["user_id"] = u.id
flash("Welcome back.")
return redirect(url_for("admin_dashboard"))
flash("Invalid credentials.")
content = """
"""
return render_page(content, csrf=session.get("csrf_token"), title="Login")
@app.route("/admin/logout")
@login_required
def admin_logout():
session.clear()
flash("Logged out.")
return redirect(url_for("home"))
@app.route("/admin/posts/new", methods=["GET","POST"])
@login_required
@csrf_protect
def admin_new_post():
if request.method == "POST":
title = (request.form.get("title") or "").strip()
content = (request.form.get("content") or "").strip()
excerpt = (request.form.get("excerpt") or "").strip() or None
cover = (request.form.get("cover_image") or "").strip() or None
raw_tags = request.form.get("tags") or ""
publish = bool(request.form.get("publish"))
if not title or not content:
flash("Title and content are required.")
return redirect(url_for("admin_new_post"))
post = Post(
title=title,
slug=unique_slug(title, Post),
content=content,
excerpt=excerpt,
cover_image=cover,
is_published=publish,
published_at=datetime.datetime.utcnow() if publish else None
)
post.tags = parse_tags(raw_tags)
db.session.add(post)
db.session.commit()
flash("Post created." + (" Published." if publish else " Saved as draft."))
return redirect(url_for("admin_edit_post", post_id=post.id))
content = """
"""
return render_page(content, csrf=session.get("csrf_token"), title="New Post")
@app.route("/admin/posts//edit", methods=["GET","POST"])
@login_required
@csrf_protect
def admin_edit_post(post_id):
p = Post.query.get_or_404(post_id)
if request.method == "POST":
action = request.form.get("action")
if action == "delete":
db.session.delete(p); db.session.commit()
flash("Post deleted.")
return redirect(url_for("admin_dashboard"))
title = (request.form.get("title") or "").strip()
slug = (request.form.get("slug") or "").strip()
excerpt = (request.form.get("excerpt") or "").strip() or None
cover = (request.form.get("cover_image") or "").strip() or None
content = (request.form.get("content") or "").strip()
raw_tags = request.form.get("tags") or ""
publish = bool(request.form.get("publish"))
if not title or not content:
flash("Title and content are required.")
return redirect(url_for("admin_edit_post", post_id=p.id))
p.title = title
if slug and slug != p.slug:
if Post.query.filter_by(slug=slug).first():
flash("Slug already in use.")
return redirect(url_for("admin_edit_post", post_id=p.id))
p.slug = slugify(slug)
p.excerpt = excerpt
p.cover_image = cover
p.content = content
p.tags = parse_tags(raw_tags)
if publish and not p.is_published:
p.is_published = True
p.published_at = datetime.datetime.utcnow()
elif not publish:
p.is_published = False
p.published_at = None
db.session.commit()
flash("Post updated.")
return redirect(url_for("admin_edit_post", post_id=p.id))
tag_line = ", ".join([t.name for t in p.tags])
content = """
"""
return render_page(content, p=p, tags=tag_line, csrf=session.get("csrf_token"), post_url=post_url, title=f"Edit: {p.title}")
@app.route("/admin/pages/new", methods=["GET","POST"])
@login_required
@csrf_protect
def admin_new_page():
if request.method == "POST":
title = (request.form.get("title") or "").strip()
slug = (request.form.get("slug") or "").strip()
content = (request.form.get("content") or "").strip()
if not title or not content:
flash("Title and content required.")
return redirect(url_for("admin_new_page"))
pg = Page(title=title, slug=slugify(slug or title), content=content)
db.session.add(pg); db.session.commit()
flash("Page created.")
return redirect(url_for("admin_edit_page", page_id=pg.id))
content = """
"""
return render_page(content, csrf=session.get("csrf_token"), title="New Page")
@app.route("/admin/pages//edit", methods=["GET","POST"])
@login_required
@csrf_protect
def admin_edit_page(page_id):
pg = Page.query.get_or_404(page_id)
if request.method == "POST":
action = request.form.get("action")
if action == "delete":
db.session.delete(pg); db.session.commit()
flash("Page deleted.")
return redirect(url_for("admin_dashboard"))
title = (request.form.get("title") or "").strip()
slug = (request.form.get("slug") or "").strip()
content = (request.form.get("content") or "").strip()
if not title or not content:
flash("Title and content required.")
return redirect(url_for("admin_edit_page", page_id=pg.id))
pg.title = title
if slug and slugify(slug) != pg.slug:
if Page.query.filter_by(slug=slugify(slug)).first():
flash("Slug already in use.")
return redirect(url_for("admin_edit_page", page_id=pg.id))
pg.slug = slugify(slug)
pg.content = content
db.session.commit()
flash("Page updated.")
return redirect(url_for("admin_edit_page", page_id=pg.id))
content = """
"""
return render_page(content, pg=pg, csrf=session.get("csrf_token"), title=f"Edit Page: {pg.title}")
# --------- Errors ---------
@app.errorhandler(404)
def not_found(e):
return render_page("",
title="Not Found"), 404
# --------- Startup ---------
if __name__ == "__main__":
init_db_and_seed()
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "5000")), debug=os.environ.get("FLASK_DEBUG") == "1")
Welcome to {SITE_NAME} — a simple blog by {AUTHOR_NAME}.
") db.session.add(about) db.session.commit() # --------- Helpers --------- def slugify(text: str) -> str: text = unicodedata.normalize("NFKD", text) text = text.encode("ascii", "ignore").decode("ascii") text = re.sub(r"[^a-zA-Z0-9]+", "-", text).strip("-").lower() return text or "post" def unique_slug(base: str, model) -> str: slug = slugify(base) if not model.query.filter_by(slug=slug).first(): return slug i = 2 while True: s = f"{slug}-{i}" if not model.query.filter_by(slug=s).first(): return s i += 1 def canonical(path: str) -> str: if CANONICAL_BASE: if not path.startswith("/"): path = "/" + path return CANONICAL_BASE.rstrip("/") + path # fallback to relative return path def csrf_setup(): if "csrf_token" not in session: session["csrf_token"] = secrets.token_urlsafe(32) @app.before_request def before_all(): csrf_setup() def csrf_protect(f): @wraps(f) def wrap(*a, **kw): if request.method == "POST": t1 = session.get("csrf_token") t2 = request.form.get("csrf_token") if not t1 or not t2 or not secrets.compare_digest(t1, t2): flash("Security check failed. Please try again.") return redirect(request.url) return f(*a, **kw) return wrap def login_required(f): @wraps(f) def wrap(*a, **kw): if "user_id" not in session: flash("Please log in.") return redirect(url_for("admin_login", next=request.path)) return f(*a, **kw) return wrap def post_url(post: Post) -> str: dt = post.published_at or post.created_at or datetime.datetime.utcnow() return f"/{dt.year:04d}/{dt.month:02d}/{post.slug}.html" def post_meta_description(post: Post) -> str: if post.excerpt: return post.excerpt[:160] plain = re.sub("<[^<]+?>", "", post.content or "") return (plain[:160] + ("…" if len(plain) > 160 else "")) or post.title def parse_tags(raw: str) -> list[Tag]: names = [t.strip() for t in (raw or "").split(",") if t.strip()] tags = [] for name in names: existing = Tag.query.filter(func.lower(Tag.name) == name.lower()).first() if existing: tags.append(existing) else: t = Tag(name=name, slug=unique_slug(name, Tag)) db.session.add(t) tags.append(t) return tags # --------- Base template --------- def render_page(content: str, **ctx): base = """{{ m }}
{% endfor %}{% endif %}
{% endwith %}
{{ content|safe }}
{{ it.title }}
{{ it.excerpt }}
{% for t in it.tags %}{{ t.name }}{% endfor %}
{{ p.title }}
{% for t in p.tags %}{{ t.name }}{% endfor %}
{{ p.content|safe }}
Comments
{% if comments %}-
{% for c in comments %}
-
{{ c.name }}{{ c.body | e | replace('\n','
') | safe }}
{% endfor %}
Tag: {{ tag.name }}
{% if posts %}
{% for p in posts %}
{% endfor %}
{% else %}
{% endif %}
{{ p.title }}
{{ (p.excerpt or (p.content|striptags))[:160] }}{% if (p.excerpt or (p.content|striptags))|length > 160 %}…{% endif %}
All tags
{% for t in tags %} {{ t.name }} {% endfor %}{{ page.title }}
{{ page.content|safe }}
Search: {{ q }}
{% if posts %}
{% for p in posts %}
{% endfor %}
{% else %}
{% endif %}
{{ p.title }}
{{ (p.excerpt or (p.content|striptags))[:160] }}{% if (p.excerpt or (p.content|striptags))|length > 160 %}…{% endif %}
Admin
Recent posts
-
{% for p in posts %}
- {% if p.is_published %}🟢{% else %}🟡{% endif %} {{ p.title }} {% if p.is_published %}— view{% endif %} {% endfor %}
Pages
-
{% for pg in pages %}
- {{ pg.title }} — view {% endfor %}
Admin Login
New Post
Edit Post
New Page
Edit Page
Not Found
The page you requested could not be found.
Comments
Post a Comment