from yaml import safe_load as yaml_load from yaml import dump as yaml_dump from sys import argv def main(): path = "./testdata/hello_world.yaml" if len(argv) > 1: path = argv[1] doc = Doc.from_file(path) print(doc.render()) print(doc.to_dict()) # https://blog.kevinjahns.de/are-crdts-suitable-for-shared-editing/ class Doc(): def __init__(self): self._nodes = [] self._edges = [] def from_file(path): doc = Doc() with open(path, "r") as f: d = yaml_load(f) doc._nodes = [Node.from_dict(i) for i in d["nodes"]] doc._edges = [Edge.from_dict(i) for i in d["edges"]] return doc def to_dict(self): return yaml_dump(slim_dict({ "nodes": [node.to_dict() for node in self._nodes], "edges": [edge.to_dict() for edge in self._edges], })) def render(self): result = [] sorted_edges = self.sorted_edges() for edge in sorted_edges: result.extend([i for i in self._nodes if i.id() == edge.from_id()]) if sorted_edges: result.extend([i for i in self._nodes if i.id() == sorted_edges[-1].to_id()]) for node in sorted(self._nodes, key=lambda x: x.id()): if not node in result: if result: self._edges.append(Edge.between(result[-1], node)) result.append(node) return "".join([i.content() for i in result]) def sorted_edges(self): result = [] edges = [edge for edge in self._edges] while edges: first = self._first_edge(edges) edges = [i for i in edges if i != first] result.append(first) return result def _first_edge(self, edges): to_ids = [edge.to_id() for edge in edges] from_ids = [edge.from_id() for edge in edges] candidates = [] for from_id in from_ids: if not from_id in to_ids: candidates.append(from_id) from_edges = [edge for edge in edges if edge.from_id() in candidates] return sorted(from_edges, key=lambda x: x.from_id())[0] class Edge(): def __init__(self, from_client_name, from_ts, to_client_name, to_ts): self._from_client_name = from_client_name self._from_ts = from_ts self._to_client_name = to_client_name self._to_ts = to_ts def between(a, b): return Edge( a.client_name(), a.ts(), b.client_name(), b.ts(), ) def from_dict(d): return Edge( d.get("from", {}).get("client_name", ""), d.get("from", {}).get("ts", 0), d.get("to", {}).get("client_name", ""), d.get("to", {}).get("ts", 0), ) def to_dict(self): return slim_dict({ "from": slim_dict({ "client_name": self._from_client_name, "ts": self._from_ts, }), "to": slim_dict({ "client_name": self._to_client_name, "ts": self._to_ts, }), }) def from_id(self): return f'{self._from_ts}/{self._from_client_name}' def to_id(self): return f'{self._to_ts}/{self._to_client_name}' def __str__(self): return f'{self._from_ts}/{self._from_client_name}..{self._to_ts}/{self._to_client_name}' class Node(): def __init__(self, client_name, ts, content, is_delete): self._ts = ts self._client_name = client_name self._content = content self._is_delete = is_delete def __str__(self): return str(self.to_dict()) def from_dict(d): return Node( d.get("client_name", ""), d.get("ts", 0), d.get("content", ""), d.get("is_delete", False), ) def to_dict(self): return slim_dict({ "client_name": self._client_name, "ts": self._ts, "content": self._content, "is_delete": self._is_delete, }) def clone(self): return Node( self._ts, self._client_name, self._content, self._is_delete, ) def id(self): return f'{self._ts}/{self._client_name}' def content(self): if self.is_delete(): return "" return self._content def client_name(self): return self._client_name def ts(self): return self._ts def is_delete(self): return self._is_delete def split(self, idx): up_to = self.clone() up_to._content = up_to._content[:idx] starting_at = self.clone() starting_at._content = up_to._content[idx:] return [up_to, starting_at] def merge(self, other): assert(self._client_name == other._client_name) self._ts = max(self._ts, other._ts) self._content += other._content self._is_delete = self._is_delete or other._is_delete return self def slim_dict(d): return {k:v for k,v in d.items() if v or (isinstance(v, int) and not isinstance(v, bool))} if __name__ == "__main__": main()