Source code for provit.browser.backend

import os
import sys
import time
import webbrowser
import logging

from pathlib import Path
from multiprocessing import Process

from ..config import get_config
from ..home import load_directories, remove_directories, add_directory
from ..agent import load_agent_profiles, load_agent_profile
from ..prov import Provenance, load_prov

from flask_cors import CORS, cross_origin
from flask import Flask, jsonify, render_template, request

app = Flask(__name__)
cors = CORS(app)
app.config["CORS_HEADERS"] = "Content-Type"

# Disable all console logging
""" app.logger.disabled = True
log = logging.getLogger('werkzeug')
log.disabled = True """

PROVIS_PORT = 5555


cfg = get_config()


# HOME VIEW (DIRECTORY LIST)
[docs]@app.route("/directories", methods=["GET", "POST"]) def directories(): if request.method == "GET": dirs = load_directories() elif request.method == "POST": new_directory = request.json["directory"] dirs = add_directory(new_directory) return jsonify({"directories": dirs})
[docs]@app.route("/directories/remove", methods=["POST"]) def delete_directories(): directory = request.json["directory"] dirs = remove_directories(directory) return jsonify({"directories": dirs})
# FILEBROWSER ENDPOINT
[docs]@app.route("/filebrowser", methods=["POST"]) def file_browser(): directory = request.json["directory"] with_files = request.json["withFiles"] with_hidden = request.json["withHidden"] if directory == "" or not os.path.exists(directory): directory = str(Path.home()) print("dir") print(directory) print(with_files) files = [] dirs = [] for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if ( not filename.endswith(".prov") and not os.path.isdir(filepath) and with_files ): if not with_hidden and not filename.startswith("."): files.append({"filename": filename, "filepath": filepath}) if os.path.isdir(filepath): if not with_hidden and not filename.startswith("."): dirs.append({"dirname": filename, "dirpath": filepath}) return jsonify( { "files": sorted(files, key=lambda x: x["filename"]), "dirs": sorted(dirs, key=lambda x: x["dirname"]), } )
# DIRECTORY VIEW (FILE LIST)
[docs]def files_with_prov(directory): for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if not filename.endswith(".prov") and not os.path.isdir(filepath): yield {"name": filename, "path": filepath}
def _build_file_list(directory): files = [] for filename in os.listdir(directory): filepath = os.path.join(directory, filename) if not filename.endswith(".prov") and not os.path.isdir(filepath): prov = None prov_file = "{}.prov".format(filepath) if os.path.exists(prov_file): print(prov_file) prov_data = Provenance(filepath) prov_tree = prov_data.tree() print(prov_tree) print(prov_data.get_primary_sources()) prov = { "last_activity": prov_tree["activity_desc"], "last_agent": prov_tree["agent"], "timestamp": prov_tree["ended_at"], "last_location": prov_tree["location"], } files.append({"filename": filename, "filepath": filepath, "prov": prov}) files = sorted(files, key=lambda x: x["filename"].lower()) return files
[docs]@app.route("/directory", methods=["POST"]) def file_list(): directory = request.json["directory"] files = _build_file_list(directory) return jsonify({"files": files})
[docs]@app.route("/directory/update", methods=["POST"]) def update_file_list(): directory = request.json["directory"] for f in files_with_prov(directory): prov = Provenance(f["path"]) if prov.tree()["location"] != f["path"]: prov.add( agents=[], activity="move_file", description="file moved to new location -> {}".format(directory), ) prov.save() files = _build_file_list(directory) return jsonify({"files": files})
# AGENTS LIST
[docs]@app.route("/agents") def agent_list(): agents = load_agent_profiles() agents_structured = {cfg.person: [], cfg.software: [], cfg.organization: []} for agent in agents: agents_structured[agent.type].append(agent.to_json()) print(agents_structured.keys()) return jsonify({"agents": agents_structured})
# FILE VIEW ENDPOINT
[docs]@app.route("/file", methods=["POST"]) def get_prov_data(): filepath = request.json["filepath"] prov = load_prov(filepath) if not prov: return jsonify({"hasProv": False}) return jsonify( { "hasProv": True, "root_event": prov.entity, "prov": prov.tree(), "agents": prov.get_agents(), } )
[docs]@app.route("/file/remove", methods=["POST"]) def remove_prov_data(): filepath = request.json["filepath"] prov = Provenance(filepath) prov.remove_last_event() prov.save() if not prov.entity: os.remove("{}.prov".format(filepath)) prov = load_prov(filepath) if not prov: return jsonify({"hasProv": False}) return jsonify( { "hasProv": True, "root_event": prov.entity, "prov": prov.tree(), "agents": prov.get_agents(), } )
[docs]@app.route("/file/add", methods=["POST"]) def add_prov_data(): filepath = request.json["filepath"] prov_data = request.json["prov"] is_timestamp = request.json["isTimestamp"] print("timestamp: ", is_timestamp) prov = Provenance(filepath) activity = prov_data["activitySlug"] agents = [x for x in prov_data["agents"] if x] desc = prov_data["comment"] sources = [x for x in prov_data["sources"] if x] primary_sources = [x for x in prov_data["primarySources"] if x] if is_timestamp: started_at = "" ended_at = prov_data["startedAt"] else: started_at = prov_data["startedAt"] ended_at = prov_data["endedAt"] prov.add( agents=agents, description=desc, activity=activity, started_at=started_at, ended_at=ended_at, ) prov.add_sources(sources) for primary_source in primary_sources: prov.add_primary_source(primary_source) prov.save() if not prov: return jsonify({}) return jsonify( { "hasProv": True, "root_event": prov.entity, "prov": prov.tree(), "agents": prov.get_agents(), } )
# CONFIG
[docs]@app.route("/config") def config(): provit_dir = cfg.provit_dir return jsonify({"config": {"provit_dir": provit_dir}})
[docs]@app.route("/config/update") def config_update(): pass
# Provit browser routes
[docs]@app.route("/") @app.route("/directory/") @app.route("/agents/") @app.route("/file/") @app.route("/config/") def index(): return render_template("index.html")
#####
[docs]def start_backend(debug=False): try: app.run(debug=debug, port=PROVIS_PORT, use_reloader=False) except OSError as e: print("Cannot start provis server.") sys.exit(1)
[docs]def start_webbrowser(): time.sleep(1) webbrowser.open("http://localhost:{}".format(PROVIS_PORT))
[docs]def start_provit_browser(debug=False): """ Start provit backend and open webbrowser """ backend_process = Process(target=start_backend, args=(debug,)) backend_process.start() start_webbrowser()