details of cloud

# app.py — "Subhash" Blogspot-like publisher (single-file Flask) # Run locally: # pip install flask flask_sqlalchemy werkzeug # python app.py # Admin: # username=subhash password=admin123 (override via env; change in production) import os, re, unicodedata, datetime, json, secrets from typing import Optional from flask import ( Flask, request, redirect, url_for, render_template_string, flash, session, abort, jsonify ) from flask_sqlalchemy import SQLAlchemy from werkzeug.security import generate_password_hash, check_password_hash from functools import wraps from sqlalchemy import func # --------- Config --------- SITE_NAME = os.environ.get("SITE_NAME", "Subhash") AUTHOR_NAME = os.environ.get("AUTHOR_NAME", "Subhash") CANONICAL_BASE = os.environ.get("CANONICAL_BASE") # e.g., https://blog.example.com BASE_DIR = os.path.abspath(os.path.dirname(__file__)) DB_URL = os.environ.get("DATABASE_URL") if DB_URL and DB_URL.startswith("postgres://"): DB_URL = DB_URL.replace("postgres://", "postgresql://", 1) if not DB_URL: DB_URL = "sqlite:///" + os.path.join(BASE_DIR, "blog.db") app = Flask(__name__) app.config.update( SECRET_KEY=os.environ.get("SECRET_KEY", secrets.token_hex(32)), SQLALCHEMY_DATABASE_URI=DB_URL, SQLALCHEMY_TRACK_MODIFICATIONS=False, PREFERRED_URL_SCHEME=os.environ.get("PREFERRED_URL_SCHEME", "https"), SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE='Lax', SESSION_COOKIE_SECURE=bool(os.environ.get('SESSION_COOKIE_SECURE', '')), # set to 1 behind HTTPS ) db = SQLAlchemy(app) # --------- Models --------- post_tag = db.Table( "post_tag", db.metadata, db.Column("post_id", db.Integer, db.ForeignKey("post.id", ondelete="CASCADE"), primary_key=True), db.Column("tag_id", db.Integer, db.ForeignKey("tag.id", ondelete="CASCADE"), primary_key=True), ) class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(255), nullable=False) is_admin = db.Column(db.Boolean, default=True) def set_password(self, pw): self.password_hash = generate_password_hash(pw) def check_password(self, pw): return check_password_hash(self.password_hash, pw) class Post(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(200), nullable=False) slug = db.Column(db.String(220), unique=True, nullable=False, index=True) content = db.Column(db.Text, nullable=False) excerpt = db.Column(db.Text, nullable=True) cover_image = db.Column(db.String(400), nullable=True) is_published = db.Column(db.Boolean, default=False, index=True) published_at = db.Column(db.DateTime, nullable=True, index=True) created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow) views = db.Column(db.Integer, default=0) tags = db.relationship("Tag", secondary=post_tag, backref="posts") class Tag(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True, nullable=False) slug = db.Column(db.String(80), unique=True, nullable=False, index=True) class Page(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(200), nullable=False) slug = db.Column(db.String(220), unique=True, nullable=False, index=True) content = db.Column(db.Text, nullable=False) created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow) class Comment(db.Model): id = db.Column(db.Integer, primary_key=True) post_id = db.Column(db.Integer, db.ForeignKey("post.id", ondelete="CASCADE"), nullable=False, index=True) name = db.Column(db.String(100), nullable=False) email = db.Column(db.String(200), nullable=True) website = db.Column(db.String(255), nullable=True) body = db.Column(db.Text, nullable=False) approved = db.Column(db.Boolean, default=True) # simple default; consider moderation in production created_at = db.Column(db.DateTime, default=datetime.datetime.utcnow) # --------- Init / Seed --------- def init_db_and_seed(): db.create_all() admin_user = os.environ.get("ADMIN_USERNAME", "subhash") admin_pass = os.environ.get("ADMIN_PASSWORD", "admin123") if not User.query.filter_by(username=admin_user).first(): u = User(username=admin_user, is_admin=True) u.set_password(admin_pass) db.session.add(u) db.session.commit() # seed a default About page if none if not Page.query.filter_by(slug="about").first(): about = Page(title="About", slug="about", content=f"

Welcome to {SITE_NAME} — a simple blog by {AUTHOR_NAME}.

") db.session.add(about) db.session.commit() # --------- Helpers --------- def slugify(text: str) -> str: text = unicodedata.normalize("NFKD", text) text = text.encode("ascii", "ignore").decode("ascii") text = re.sub(r"[^a-zA-Z0-9]+", "-", text).strip("-").lower() return text or "post" def unique_slug(base: str, model) -> str: slug = slugify(base) if not model.query.filter_by(slug=slug).first(): return slug i = 2 while True: s = f"{slug}-{i}" if not model.query.filter_by(slug=s).first(): return s i += 1 def canonical(path: str) -> str: if CANONICAL_BASE: if not path.startswith("/"): path = "/" + path return CANONICAL_BASE.rstrip("/") + path # fallback to relative return path def csrf_setup(): if "csrf_token" not in session: session["csrf_token"] = secrets.token_urlsafe(32) @app.before_request def before_all(): csrf_setup() def csrf_protect(f): @wraps(f) def wrap(*a, **kw): if request.method == "POST": t1 = session.get("csrf_token") t2 = request.form.get("csrf_token") if not t1 or not t2 or not secrets.compare_digest(t1, t2): flash("Security check failed. Please try again.") return redirect(request.url) return f(*a, **kw) return wrap def login_required(f): @wraps(f) def wrap(*a, **kw): if "user_id" not in session: flash("Please log in.") return redirect(url_for("admin_login", next=request.path)) return f(*a, **kw) return wrap def post_url(post: Post) -> str: dt = post.published_at or post.created_at or datetime.datetime.utcnow() return f"/{dt.year:04d}/{dt.month:02d}/{post.slug}.html" def post_meta_description(post: Post) -> str: if post.excerpt: return post.excerpt[:160] plain = re.sub("<[^<]+?>", "", post.content or "") return (plain[:160] + ("…" if len(plain) > 160 else "")) or post.title def parse_tags(raw: str) -> list[Tag]: names = [t.strip() for t in (raw or "").split(",") if t.strip()] tags = [] for name in names: existing = Tag.query.filter(func.lower(Tag.name) == name.lower()).first() if existing: tags.append(existing) else: t = Tag(name=name, slug=unique_slug(name, Tag)) db.session.add(t) tags.append(t) return tags # --------- Base template --------- def render_page(content: str, **ctx): base = """ {% if title %}{{ title }} · {% endif %}{{ site_name }} {% if meta_description %}{% endif %} {% if canonical_url %}{% endif %} {% if og %} {% if og.description %}{% endif %} {% if og.image %}{% endif %} {% if og.description %}{% endif %} {% if og.image %}{% endif %} {% endif %} {% if blog_jsonld %}{% endif %} {% if page_jsonld %}{% endif %}
{% with messages = get_flashed_messages() %} {% if messages %}{% for m in messages %}
{{ m }}
{% endfor %}{% endif %} {% endwith %} {{ content|safe }}
""" # JSON-LD for Blog + SearchAction blog_jsonld = { "@context":"https://schema.org", "@type":"Blog", "name": SITE_NAME, "url": CANONICAL_BASE or request.url_root.rstrip("/"), "publisher": {"@type":"Organization", "name": AUTHOR_NAME}, "potentialAction":{ "@type":"SearchAction", "target": (canonical(url_for('search')) + "?q={search_term_string}") if CANONICAL_BASE else (request.url_root.rstrip("/") + url_for('search') + "?q={search_term_string}"), "query-input":"required name=search_term_string" } } ctx.setdefault("blog_jsonld", json.dumps(blog_jsonld)) ctx.setdefault("site_name", SITE_NAME) html = render_template_string(base, content=content, **ctx) return html # --------- Security headers --------- @app.after_request def security_headers(resp): resp.headers["X-Content-Type-Options"] = "nosniff" resp.headers["Referrer-Policy"] = "strict-origin-when-cross-origin" resp.headers["X-Frame-Options"] = "SAMEORIGIN" # Inline CSS used; tighten CSP if extracting assets resp.headers["Content-Security-Policy"] = "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self' 'unsafe-inline'; img-src 'self' data:" return resp # --------- Public Routes --------- @app.route("/") def home(): page = max(int(request.args.get("page", 1) or 1), 1) per_page = 8 q = Post.query.filter_by(is_published=True).order_by(Post.published_at.desc()) total = q.count() posts = q.offset((page-1)*per_page).limit(per_page).all() has_next = (page * per_page) < total items = [] for p in posts: items.append({ "title": p.title, "url": post_url(p), "date": p.published_at.date().isoformat() if p.published_at else "", "excerpt": post_meta_description(p), "cover": p.cover_image, "tags": [t for t in p.tags], }) content = """ {% for it in items %}
{% if it.cover %}{{ it.title }}{% endif %}

{{ it.title }}

{{ it.excerpt }}

{% for t in it.tags %}{{ t.name }}{% endfor %}
{% endfor %}
{% if page > 1 %}« Newer{% endif %} {% if has_next %}Older »{% endif %}
""" desc = f"{SITE_NAME} — posts by {AUTHOR_NAME}." return render_page(content, items=items, page=page, has_next=has_next, title=None, meta_description=desc, og={"title": SITE_NAME, "description": desc, "url": canonical(url_for('home'))}) @app.route("///.html", methods=["GET","POST"]) @csrf_protect def post_detail(year, month, slug): p = Post.query.filter_by(slug=slug).first_or_404() # Public only if published and month/year match if not p.is_published or not p.published_at or p.published_at.year != year or p.published_at.month != month: # allow preview if logged in if "user_id" not in session: abort(404) # comments (simple) if request.method == "POST": if not p.is_published: abort(400) name = (request.form.get("name") or "").strip() body = (request.form.get("body") or "").strip() email = (request.form.get("email") or "").strip() or None website = (request.form.get("website") or "").strip() or None hp = (request.form.get("hp") or "").strip() if hp: # honeypot flash("Something went wrong.") return redirect(post_url(p)) if not name or not body: flash("Name and comment are required.") return redirect(post_url(p)) c = Comment(post_id=p.id, name=name, email=email, website=website, body=body, approved=True) db.session.add(c); db.session.commit() flash("Comment posted.") return redirect(post_url(p)) if p.is_published: p.views += 1 db.session.commit() title = p.title description = post_meta_description(p) canonical_url = canonical(post_url(p)) published_iso = (p.published_at or p.created_at).isoformat() tags = p.tags # JSON-LD BlogPosting jsonld = { "@context": "https://schema.org", "@type": "BlogPosting", "headline": p.title, "datePublished": (p.published_at or p.created_at).strftime("%Y-%m-%d"), "dateModified": p.updated_at.strftime("%Y-%m-%d"), "author": {"@type":"Person","name": AUTHOR_NAME}, "publisher": {"@type":"Organization","name": AUTHOR_NAME}, "image": p.cover_image, "mainEntityOfPage": {"@type":"WebPage","@id": canonical_url}, "url": canonical_url, "keywords": ", ".join([t.name for t in tags]) if tags else None, "description": description } comments = Comment.query.filter_by(post_id=p.id, approved=True).order_by(Comment.created_at.asc()).all() content = """
{% if p.cover_image %}{{ p.title }}{% endif %}

{{ p.title }}

{% for t in p.tags %}{{ t.name }}{% endfor %}
{{ p.content|safe }}

Comments

{% if comments %}
    {% for c in comments %}
  • {{ c.name }}
    {{ c.body | e | replace('\n','
    ') | safe }}
  • {% endfor %}
{% else %} {% endif %}
""" og = {"title": title, "description": description, "url": canonical_url, "image": p.cover_image, "type":"article"} return render_page(content, p=p, author=AUTHOR_NAME, comments=comments, csrf=session.get("csrf_token"), title=title, meta_description=description, canonical_url=canonical_url, og=og, page_jsonld=json.dumps(jsonld)) @app.route("/tag/") def tag_page(tag_slug): tag = Tag.query.filter_by(slug=tag_slug).first_or_404() posts = (Post.query .join(post_tag).join(Tag) .filter(Tag.slug == tag_slug, Post.is_published == True) .order_by(Post.published_at.desc()) .all()) content = """

Tag: {{ tag.name }}

{% if posts %}
{% for p in posts %}

{{ p.title }}

{{ (p.excerpt or (p.content|striptags))[:160] }}{% if (p.excerpt or (p.content|striptags))|length > 160 %}…{% endif %}

{% endfor %}
{% else %} {% endif %}
""" return render_page(content, tag=tag, posts=posts, post_url=post_url, title=f"Tag: {tag.name}", meta_description=f"Posts tagged {tag.name}", og={"title": f"Tag: {tag.name} · {SITE_NAME}", "description": f"Posts tagged {tag.name}", "url": canonical(url_for('tag_page', tag_slug=tag_slug))}) @app.route("/tags") def tags_page(): tags = Tag.query.order_by(Tag.name.asc()).all() content = """

All tags

{% for t in tags %} {{ t.name }} {% endfor %}
""" return render_page(content, tags=tags, title="Tags", meta_description=f"All tags on {SITE_NAME}") @app.route("/p/.html") def page_detail(slug): page = Page.query.filter_by(slug=slug).first_or_404() desc = re.sub("<[^<]+?>", "", page.content or "")[:160] content = """

{{ page.title }}

{{ page.content|safe }}
""" return render_page(content, page=page, title=page.title, meta_description=desc, og={"title": page.title, "description": desc, "url": canonical(url_for('page_detail', slug=slug))}) @app.route("/search") def search(): q = (request.args.get("q") or "").strip() if not q: return redirect(url_for("home")) like = f"%{q}%" posts = (Post.query .filter(Post.is_published == True) .filter((Post.title.ilike(like)) | (Post.content.ilike(like))) .order_by(Post.published_at.desc()).all()) content = """

Search: {{ q }}

{% if posts %}
{% for p in posts %}

{{ p.title }}

{{ (p.excerpt or (p.content|striptags))[:160] }}{% if (p.excerpt or (p.content|striptags))|length > 160 %}…{% endif %}

{% endfor %}
{% else %} {% endif %}
""" return render_page(content, q=q, posts=posts, post_url=post_url, title=f"Search: {q}", meta_description=f"Search results for {q}", og={"title": f"Search: {q} · {SITE_NAME}", "description": f"Search results for {q}", "url": canonical(url_for('search'))}) @app.route("/feed") def feed(): posts = Post.query.filter_by(is_published=True).order_by(Post.published_at.desc()).limit(20).all() items = [] for p in posts: link = canonical(post_url(p)) pub = (p.published_at or p.created_at).strftime("%a, %d %b %Y %H:%M:%S +0000") desc = post_meta_description(p) items.append(f"{escape_xml(p.title)}{link}{link}{pub}{escape_xml(desc)}") rss = f""" {escape_xml(SITE_NAME)} {canonical(url_for('home'))} {escape_xml(SITE_NAME)} RSS Feed {''.join(items)} """ return app.response_class(rss, mimetype="application/rss+xml") def escape_xml(s: str) -> str: return (s or "").replace("&","&").replace("<","<").replace(">",">").replace('"',""") @app.route("/sitemap.xml") def sitemap(): urls = [] # Home urls.append({"loc": canonical(url_for("home", _external=False)), "lastmod": datetime.datetime.utcnow().date().isoformat()}) # Pages for pg in Page.query.order_by(Page.updated_at.desc()).all(): urls.append({"loc": canonical(url_for("page_detail", slug=pg.slug, _external=False)), "lastmod": pg.updated_at.date().isoformat()}) # Posts for p in Post.query.filter_by(is_published=True).order_by(Post.updated_at.desc()).all(): urls.append({"loc": canonical(post_url(p)), "lastmod": (p.updated_at or p.published_at or p.created_at).date().isoformat()}) xml_items = "\n".join([f' {u["loc"]}{f"{u["lastmod"]}" if u.get("lastmod") else ""}' for u in urls]) xml = f'\n\n{xml_items}\n' return app.response_class(xml, mimetype="application/xml") @app.route("/robots.txt") def robots(): lines = [ "User-agent: *", "Allow: /", f"Sitemap: {canonical(url_for('sitemap'))}" ] return app.response_class("\n".join(lines), mimetype="text/plain") @app.route("/healthz") def healthz(): try: db.session.execute("SELECT 1") return jsonify(status="ok"), 200 except Exception as e: return jsonify(status="db_error", error=str(e)), 500 # --------- Admin --------- @app.route("/admin") @login_required def admin_dashboard(): posts = Post.query.order_by(Post.updated_at.desc()).limit(12).all() pages = Page.query.order_by(Page.updated_at.desc()).all() content = """

Admin

New Post New Page Logout

Recent posts

    {% for p in posts %}
  • {% if p.is_published %}🟢{% else %}🟡{% endif %} {{ p.title }} {% if p.is_published %}— view{% endif %}
  • {% endfor %}

Pages

""" return render_page(content, posts=posts, pages=pages, post_url=post_url, title="Admin") @app.route("/admin/login", methods=["GET", "POST"]) @csrf_protect def admin_login(): if "user_id" in session: return redirect(url_for("admin_dashboard")) if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" u = User.query.filter_by(username=username).first() if u and u.check_password(password): session["user_id"] = u.id flash("Welcome back.") return redirect(url_for("admin_dashboard")) flash("Invalid credentials.") content = """

Admin Login

""" return render_page(content, csrf=session.get("csrf_token"), title="Login") @app.route("/admin/logout") @login_required def admin_logout(): session.clear() flash("Logged out.") return redirect(url_for("home")) @app.route("/admin/posts/new", methods=["GET","POST"]) @login_required @csrf_protect def admin_new_post(): if request.method == "POST": title = (request.form.get("title") or "").strip() content = (request.form.get("content") or "").strip() excerpt = (request.form.get("excerpt") or "").strip() or None cover = (request.form.get("cover_image") or "").strip() or None raw_tags = request.form.get("tags") or "" publish = bool(request.form.get("publish")) if not title or not content: flash("Title and content are required.") return redirect(url_for("admin_new_post")) post = Post( title=title, slug=unique_slug(title, Post), content=content, excerpt=excerpt, cover_image=cover, is_published=publish, published_at=datetime.datetime.utcnow() if publish else None ) post.tags = parse_tags(raw_tags) db.session.add(post) db.session.commit() flash("Post created." + (" Published." if publish else " Saved as draft.")) return redirect(url_for("admin_edit_post", post_id=post.id)) content = """

New Post

""" return render_page(content, csrf=session.get("csrf_token"), title="New Post") @app.route("/admin/posts//edit", methods=["GET","POST"]) @login_required @csrf_protect def admin_edit_post(post_id): p = Post.query.get_or_404(post_id) if request.method == "POST": action = request.form.get("action") if action == "delete": db.session.delete(p); db.session.commit() flash("Post deleted.") return redirect(url_for("admin_dashboard")) title = (request.form.get("title") or "").strip() slug = (request.form.get("slug") or "").strip() excerpt = (request.form.get("excerpt") or "").strip() or None cover = (request.form.get("cover_image") or "").strip() or None content = (request.form.get("content") or "").strip() raw_tags = request.form.get("tags") or "" publish = bool(request.form.get("publish")) if not title or not content: flash("Title and content are required.") return redirect(url_for("admin_edit_post", post_id=p.id)) p.title = title if slug and slug != p.slug: if Post.query.filter_by(slug=slug).first(): flash("Slug already in use.") return redirect(url_for("admin_edit_post", post_id=p.id)) p.slug = slugify(slug) p.excerpt = excerpt p.cover_image = cover p.content = content p.tags = parse_tags(raw_tags) if publish and not p.is_published: p.is_published = True p.published_at = datetime.datetime.utcnow() elif not publish: p.is_published = False p.published_at = None db.session.commit() flash("Post updated.") return redirect(url_for("admin_edit_post", post_id=p.id)) tag_line = ", ".join([t.name for t in p.tags]) content = """

Edit Post

{% if p.is_published %}View{% endif %}
""" return render_page(content, p=p, tags=tag_line, csrf=session.get("csrf_token"), post_url=post_url, title=f"Edit: {p.title}") @app.route("/admin/pages/new", methods=["GET","POST"]) @login_required @csrf_protect def admin_new_page(): if request.method == "POST": title = (request.form.get("title") or "").strip() slug = (request.form.get("slug") or "").strip() content = (request.form.get("content") or "").strip() if not title or not content: flash("Title and content required.") return redirect(url_for("admin_new_page")) pg = Page(title=title, slug=slugify(slug or title), content=content) db.session.add(pg); db.session.commit() flash("Page created.") return redirect(url_for("admin_edit_page", page_id=pg.id)) content = """

New Page

""" return render_page(content, csrf=session.get("csrf_token"), title="New Page") @app.route("/admin/pages//edit", methods=["GET","POST"]) @login_required @csrf_protect def admin_edit_page(page_id): pg = Page.query.get_or_404(page_id) if request.method == "POST": action = request.form.get("action") if action == "delete": db.session.delete(pg); db.session.commit() flash("Page deleted.") return redirect(url_for("admin_dashboard")) title = (request.form.get("title") or "").strip() slug = (request.form.get("slug") or "").strip() content = (request.form.get("content") or "").strip() if not title or not content: flash("Title and content required.") return redirect(url_for("admin_edit_page", page_id=pg.id)) pg.title = title if slug and slugify(slug) != pg.slug: if Page.query.filter_by(slug=slugify(slug)).first(): flash("Slug already in use.") return redirect(url_for("admin_edit_page", page_id=pg.id)) pg.slug = slugify(slug) pg.content = content db.session.commit() flash("Page updated.") return redirect(url_for("admin_edit_page", page_id=pg.id)) content = """

Edit Page

View
""" return render_page(content, pg=pg, csrf=session.get("csrf_token"), title=f"Edit Page: {pg.title}") # --------- Errors --------- @app.errorhandler(404) def not_found(e): return render_page("

Not Found

The page you requested could not be found.

", title="Not Found"), 404 # --------- Startup --------- if __name__ == "__main__": init_db_and_seed() app.run(host="0.0.0.0", port=int(os.environ.get("PORT", "5000")), debug=os.environ.get("FLASK_DEBUG") == "1")

Comments

Popular posts from this blog

english note

First blog

CV making