master
bel 2023-09-16 22:30:53 -06:00
parent 6283d7f321
commit 26e316682d
1 changed files with 0 additions and 179 deletions

View File

@ -1,179 +0,0 @@
from yaml import safe_load as yaml_load
from yaml import dump as yaml_dump
from sys import argv
def main():
path = "./testdata/hello_world.yaml"
if len(argv) > 1:
path = argv[1]
doc = Doc.from_file(path)
print(doc.render())
print(doc.to_dict())
# https://blog.kevinjahns.de/are-crdts-suitable-for-shared-editing/
class Doc():
def __init__(self):
self._nodes = []
self._edges = []
def from_file(path):
doc = Doc()
with open(path, "r") as f:
d = yaml_load(f)
doc._nodes = [Node.from_dict(i) for i in d["nodes"]]
doc._edges = [Edge.from_dict(i) for i in d["edges"]]
return doc
def to_dict(self):
return yaml_dump(slim_dict({
"nodes": [node.to_dict() for node in self._nodes],
"edges": [edge.to_dict() for edge in self._edges],
}))
def render(self):
result = []
sorted_edges = self.sorted_edges()
for edge in sorted_edges:
result.extend([i for i in self._nodes if i.id() == edge.from_id()])
if sorted_edges:
result.extend([i for i in self._nodes if i.id() == sorted_edges[-1].to_id()])
for node in sorted(self._nodes, key=lambda x: x.id()):
if not node in result:
if result:
self._edges.append(Edge.between(result[-1], node))
result.append(node)
return "".join([i.content() for i in result])
def sorted_edges(self):
result = []
edges = [edge for edge in self._edges]
while edges:
first = self._first_edge(edges)
edges = [i for i in edges if i != first]
result.append(first)
return result
def _first_edge(self, edges):
to_ids = [edge.to_id() for edge in edges]
from_ids = [edge.from_id() for edge in edges]
candidates = []
for from_id in from_ids:
if not from_id in to_ids:
candidates.append(from_id)
from_edges = [edge for edge in edges if edge.from_id() in candidates]
return sorted(from_edges, key=lambda x: x.from_id())[0]
class Edge():
def __init__(self, from_client_name, from_ts, to_client_name, to_ts):
self._from_client_name = from_client_name
self._from_ts = from_ts
self._to_client_name = to_client_name
self._to_ts = to_ts
def between(a, b):
return Edge(
a.client_name(),
a.ts(),
b.client_name(),
b.ts(),
)
def from_dict(d):
return Edge(
d.get("from", {}).get("client_name", ""),
d.get("from", {}).get("ts", 0),
d.get("to", {}).get("client_name", ""),
d.get("to", {}).get("ts", 0),
)
def to_dict(self):
return slim_dict({
"from": slim_dict({
"client_name": self._from_client_name,
"ts": self._from_ts,
}),
"to": slim_dict({
"client_name": self._to_client_name,
"ts": self._to_ts,
}),
})
def from_id(self):
return f'{self._from_ts}/{self._from_client_name}'
def to_id(self):
return f'{self._to_ts}/{self._to_client_name}'
def __str__(self):
return f'{self._from_ts}/{self._from_client_name}..{self._to_ts}/{self._to_client_name}'
class Node():
def __init__(self, client_name, ts, content, is_delete):
self._ts = ts
self._client_name = client_name
self._content = content
self._is_delete = is_delete
def __str__(self):
return str(self.to_dict())
def from_dict(d):
return Node(
d.get("client_name", ""),
d.get("ts", 0),
d.get("content", ""),
d.get("is_delete", False),
)
def to_dict(self):
return slim_dict({
"client_name": self._client_name,
"ts": self._ts,
"content": self._content,
"is_delete": self._is_delete,
})
def clone(self):
return Node(
self._ts,
self._client_name,
self._content,
self._is_delete,
)
def id(self):
return f'{self._ts}/{self._client_name}'
def content(self):
if self.is_delete():
return ""
return self._content
def client_name(self):
return self._client_name
def ts(self):
return self._ts
def is_delete(self):
return self._is_delete
def split(self, idx):
up_to = self.clone()
up_to._content = up_to._content[:idx]
starting_at = self.clone()
starting_at._content = up_to._content[idx:]
return [up_to, starting_at]
def merge(self, other):
assert(self._client_name == other._client_name)
self._ts = max(self._ts, other._ts)
self._content += other._content
self._is_delete = self._is_delete or other._is_delete
return self
def slim_dict(d):
return {k:v for k,v in d.items() if v or (isinstance(v, int) and not isinstance(v, bool))}
if __name__ == "__main__":
main()